Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Commit

Permalink
Start refactoring the thrift API for shards a bit
Browse files Browse the repository at this point in the history
tests still dont pass. jankiness
  • Loading branch information
Ed Ceaser committed Mar 18, 2010
1 parent cc3026e commit 9339a5d
Show file tree
Hide file tree
Showing 13 changed files with 73 additions and 77 deletions.
Expand Up @@ -13,7 +13,7 @@ class CachingNameServer(nameServer: ManagingNameServer, mappingFunction: Long =>

@volatile protected var shardInfos = mutable.Map.empty[Int, ShardInfo]
@volatile private var familyTree: scala.collection.Map[Int, Seq[ChildInfo]] = null
@volatile private var forwardings: scala.collection.Map[List[Int], TreeMap[Long, ShardInfo]] = null
@volatile private var forwardings: scala.collection.Map[Int, TreeMap[Long, ShardInfo]] = null

reload()

Expand All @@ -27,7 +27,7 @@ class CachingNameServer(nameServer: ManagingNameServer, mappingFunction: Long =>

val newFamilyTree = nameServer.listShardChildren()

val newForwardings = new mutable.HashMap[List[Int], TreeMap[Long, ShardInfo]]
val newForwardings = new mutable.HashMap[Int, TreeMap[Long, ShardInfo]]
nameServer.getForwardings().foreach { forwarding =>
val treeMap = newForwardings.getOrElseUpdate(forwarding.tableId, new TreeMap[Long, ShardInfo])
treeMap.put(forwarding.baseId, newShardInfos(forwarding.shardId))
Expand All @@ -38,7 +38,7 @@ class CachingNameServer(nameServer: ManagingNameServer, mappingFunction: Long =>
forwardings = newForwardings
}

def findCurrentForwarding(tableId: List[Int], id: Long) = {
def findCurrentForwarding(tableId: Int, id: Long) = {
forwardings.get(tableId).flatMap { bySourceIds =>
val item = bySourceIds.floorEntry(mappingFunction(id))
if (item != null) {
Expand All @@ -64,7 +64,7 @@ class CachingNameServer(nameServer: ManagingNameServer, mappingFunction: Long =>
def markShardBusy(shardId: Int, busy: Busy.Value) = nameServer.markShardBusy(shardId, busy)
def setForwarding(forwarding: Forwarding) = nameServer.setForwarding(forwarding)
def replaceForwarding(oldShardId: Int, newShardId: Int) = nameServer.replaceForwarding(oldShardId, newShardId)
def getForwarding(tableId: List[Int], baseId: Long) = nameServer.getForwarding(tableId, baseId)
def getForwarding(tableId: Int, baseId: Long) = nameServer.getForwarding(tableId, baseId)
def getForwardingForShard(shardId: Int) = nameServer.getForwardingForShard(shardId)
def getForwardings() = nameServer.getForwardings()
def shardIdsForHostname(hostname: String, className: String) = nameServer.shardIdsForHostname(hostname, className)
Expand Down
@@ -1,4 +1,4 @@
package com.twitter.gizzard.nameserver


case class Forwarding(tableId: List[Int], baseId: Long, shardId: Int)
case class Forwarding(serviceId: Int, tableId: Int, baseId: Long, shardId: Int)
Expand Up @@ -4,7 +4,7 @@ import shards._


trait ForwardingNameServer {
def findCurrentForwarding(tableId: List[Int], id: Long): ShardInfo
def findCurrentForwarding(tableId: Int, id: Long): ShardInfo
def getShardInfo(shardId: Int): ShardInfo
// def findShardById(shardId: Int, weight: Int): ShardInfo
// def findShardById(shardId: Int): ShardInfo = findShardById(shardId, 1)
Expand Down
Expand Up @@ -16,7 +16,7 @@ trait ManagingNameServer extends Shard {
def markShardBusy(shardId: Int, busy: Busy.Value)
def setForwarding(forwarding: Forwarding)
def replaceForwarding(oldShardId: Int, newShardId: Int)
def getForwarding(tableId: List[Int], baseId: Long): ShardInfo
def getForwarding(tableId: Int, baseId: Long): ShardInfo
def getForwardingForShard(shardId: Int): Forwarding
def getForwardings(): List[Forwarding]
def shardIdsForHostname(hostname: String, className: String): List[Int]
Expand Down
Expand Up @@ -7,7 +7,7 @@ trait ReadWriteNameServer extends ManagingNameServer with ReadWriteShard[Managin
def findShard(shardInfo: ShardInfo) = readOperation(_.findShard(shardInfo))
def getBusyShards() = readOperation(_.getBusyShards())
def getChildShardsOfClass(parentShardId: Int, className: String) = readOperation(_.getChildShardsOfClass(parentShardId, className))
def getForwarding(tableId: List[Int], baseId: Long) = readOperation(_.getForwarding(tableId, baseId))
def getForwarding(tableId: Int, baseId: Long) = readOperation(_.getForwarding(tableId, baseId))
def getForwardingForShard(shardId: Int) = readOperation(_.getForwardingForShard(shardId))
def getForwardings() = readOperation(_.getForwardings())
def getParentShard(shardId: Int) = readOperation(_.getParentShard(shardId))
Expand Down
83 changes: 35 additions & 48 deletions src/main/scala/com/twitter/gizzard/nameserver/SqlNameServer.scala
Expand Up @@ -2,7 +2,6 @@ package com.twitter.gizzard.nameserver

import java.sql.{ResultSet, SQLException, SQLIntegrityConstraintViolationException}
import com.twitter.querulous.evaluator.QueryEvaluator
import net.lag.logging.Logger
import jobs.JobScheduler
import scala.collection.mutable
import shards._
Expand Down Expand Up @@ -36,38 +35,32 @@ CREATE TABLE shard_children (
) ENGINE=INNODB
/* ALTER TABLE shard_children ADD weight INT NOT NULL DEFAULT 1; */
"""
private val log = Logger.get(getClass.getName)


def rebuildSchema(queryEvaluator: QueryEvaluator) {
queryEvaluator.execute("DROP TABLE IF EXISTS shards")
queryEvaluator.execute("DROP TABLE IF EXISTS shard_children")
queryEvaluator.execute(SHARDS_DDL)
queryEvaluator.execute(SHARD_CHILDREN_DDL)
}
}

class SqlNameServer(queryEvaluator: QueryEvaluator, tablePrefix: String)
class SqlNameServer(queryEvaluator: QueryEvaluator, shardMaterializerFactory: ShardInfo => Unit)
extends ManagingNameServer {
val children = List()
val shardInfo = new ShardInfo("com.twitter.gizzard.nameserver.SqlNameServer", tablePrefix, "")
val shardInfo = new ShardInfo("com.twitter.gizzard.nameserver.SqlNameServer", "", "")
val weight = 1 // hardcode for now

val FORWARDINGS_DDL = """
CREATE TABLE """ + tablePrefix + """_forwardings (
CREATE TABLE forwardings (
service_id INT NOT NULL,
base_source_id BIGINT NOT NULL,
table_id VARCHAR(255) NOT NULL,
table_id INT NOT NULL,
shard_id INT NOT NULL,
PRIMARY KEY (base_source_id, table_id),
PRIMARY KEY (service_id, base_source_id, table_id),
UNIQUE unique_shard_id (shard_id)
) ENGINE=INNODB;
"""

val SEQUENCE_DDL = """
CREATE TABLE IF NOT EXISTS """ + tablePrefix + """_sequence (
id INT UNSIGNED NOT NULL
CREATE TABLE IF NOT EXISTS sequences (
service_id INT NOT NULL,
id INT UNSIGNED NOT NULL,
PRIMARY KEY (service_id)
) ENGINE=INNODB;
"""

Expand All @@ -78,10 +71,13 @@ CREATE TABLE IF NOT EXISTS """ + tablePrefix + """_sequence (
}

private def rowToForwarding(row: ResultSet) = {
new Forwarding(List.fromString(row.getString("table_id"), '.').map(_.toInt), row.getLong("base_source_id"), row.getInt("shard_id"))
new Forwarding(0, row.getInt("table_id"), row.getLong("base_source_id"), row.getInt("shard_id"))
}

private def rowToChildInfo(row: ResultSet) = {
new ChildInfo(row.getInt("child_id"), row.getInt("weight"))
}

private val forwardingTable = tablePrefix + "_forwardings"
private def tableIdDbString(tableId: List[Int]) = tableId.mkString(".")
private def tableIdTuple(tableId: List[Int]) = (tableId(0), tableId(1))

Expand All @@ -96,7 +92,7 @@ CREATE TABLE IF NOT EXISTS """ + tablePrefix + """_sequence (
} getOrElse {
transaction.insert("INSERT INTO shards (class_name, table_prefix, hostname, source_type, destination_type) VALUES (?, ?, ?, ?, ?)", shardInfo.className, shardInfo.tablePrefix, shardInfo.hostname, shardInfo.sourceType, shardInfo.destinationType)
}
// shardRepository.create(shardInfo)
shardMaterializerFactory(shardInfo)
shardId
} catch {
case e: SQLIntegrityConstraintViolationException =>
Expand Down Expand Up @@ -158,14 +154,14 @@ CREATE TABLE IF NOT EXISTS """ + tablePrefix + """_sequence (
val map = mutable.HashMap.empty[Int, mutable.ArrayBuffer[ChildInfo]]
queryEvaluator.select("SELECT parent_id, child_id, weight FROM shard_children ORDER BY parent_id, child_id") { row =>
val arrayBuffer = map.getOrElseUpdate(row.getInt("parent_id"), new mutable.ArrayBuffer[ChildInfo]())
arrayBuffer += new ChildInfo(row.getInt("child_id"), row.getInt("weight"))
arrayBuffer += rowToChildInfo(row)
}
map
}

def listShardChildren(shardId: Int) = {
queryEvaluator.select("SELECT child_id, weight FROM shard_children WHERE parent_id = ? ORDER BY weight DESC", shardId) { row =>
new ChildInfo(row.getInt("child_id"), row.getInt("weight"))
rowToChildInfo(row)
}.toList
}

Expand All @@ -182,47 +178,44 @@ CREATE TABLE IF NOT EXISTS """ + tablePrefix + """_sequence (
}

def setForwarding(forwarding: Forwarding) {
if (queryEvaluator.execute("UPDATE " + forwardingTable + " SET shard_id = ? WHERE base_source_id = ? AND table_id = ?",
forwarding.shardId, forwarding.baseId, tableIdDbString(forwarding.tableId)) == 0) {
queryEvaluator.execute("INSERT INTO " + forwardingTable + " (base_source_id, table_id, shard_id) VALUES (?, ?, ?)",
forwarding.baseId, tableIdDbString(forwarding.tableId), forwarding.shardId)
if (queryEvaluator.execute("UPDATE forwardings SET shard_id = ? WHERE base_source_id = ? AND table_id = ?",
forwarding.shardId, forwarding.baseId, forwarding.tableId) == 0) {
queryEvaluator.execute("INSERT INTO forwardings (base_source_id, table_id, shard_id) VALUES (?, ?, ?)",
forwarding.baseId, forwarding.tableId, forwarding.shardId)
}
}

def replaceForwarding(oldShardId: Int, newShardId: Int) {
queryEvaluator.execute("UPDATE " + forwardingTable + " SET shard_id = ? WHERE shard_id = ?", newShardId, oldShardId)
queryEvaluator.execute("UPDATE forwardings SET shard_id = ? WHERE shard_id = ?", newShardId, oldShardId)
}

def getForwarding(tableId: List[Int], baseId: Long): ShardInfo = {
getShard(queryEvaluator.select("SELECT shard_id FROM " + forwardingTable + " WHERE base_source_id = ? AND table_id = ?",
baseId, tableIdDbString(tableId)) { row =>
def getForwarding(tableId: Int, baseId: Long): ShardInfo = {
getShard(queryEvaluator.select("SELECT shard_id FROM forwardings WHERE base_source_id = ? AND table_id = ?", baseId, tableId) { row =>
row.getInt("shard_id")
}.firstOption.getOrElse { throw new ShardException("No such forwarding") })
}

def getForwardingForShard(shardId: Int): Forwarding = {
queryEvaluator.select("SELECT * FROM " + forwardingTable + " WHERE shard_id = ?", shardId) { row =>
queryEvaluator.select("SELECT * FROM forwardings WHERE shard_id = ?", shardId) { row =>
rowToForwarding(row)
}.firstOption.getOrElse { throw new ShardException("No such forwarding") }
}

def getForwardings(): List[Forwarding] = {
// XXX: REVISIT ORDERING
queryEvaluator.select("SELECT * FROM " + forwardingTable + " ORDER BY table_id, base_source_id DESC") { row =>
queryEvaluator.select("SELECT * FROM forwardings ORDER BY table_id, base_source_id DESC") { row =>
rowToForwarding(row)
}.toList
}


def shardIdsForHostname(hostname: String, className: String): List[Int] = {
val shardIds = new mutable.ListBuffer[Int]
queryEvaluator.select("SELECT id FROM shards WHERE hostname = ? AND class_name = ?", hostname, className) { row =>
row.getInt("id")
}.toList
}

def shardsForHostname(hostname: String, className: String): List[ShardInfo] = {
val shardIds = new mutable.ListBuffer[Int]
queryEvaluator.select("SELECT * FROM shards WHERE hostname = ? AND class_name = ?", hostname, className) { row =>
rowToShardInfo(row)
}.toList
Expand Down Expand Up @@ -271,21 +264,15 @@ CREATE TABLE IF NOT EXISTS """ + tablePrefix + """_sequence (
reloadForwardings()
} */

/* def findShardById(shardId: Int, weight: Int): S = {
val shardInfo = SqlNameServer.shardInfos.getOrElse(shardId, throw new NonExistentShard)
val children =
SqlNameServer.familyTree.getOrElse(shardId, new mutable.ArrayBuffer[ChildInfo]).map { child =>
findShardById(child.shardId, child.weight)
}.toList
shardRepository.find(shardInfo, weight, children)
} */

def rebuildSchema() {
SqlNameServer.rebuildSchema(queryEvaluator)
queryEvaluator.execute("DROP TABLE IF EXISTS " + forwardingTable)
queryEvaluator.execute("DROP TABLE IF EXISTS " + tablePrefix + "_sequence")
queryEvaluator.execute("DROP TABLE IF EXISTS shards")
queryEvaluator.execute("DROP TABLE IF EXISTS shard_children")
queryEvaluator.execute(SqlNameServer.SHARDS_DDL)
queryEvaluator.execute(SqlNameServer.SHARD_CHILDREN_DDL)
queryEvaluator.execute("DROP TABLE IF EXISTS forwardings")
queryEvaluator.execute("DROP TABLE IF EXISTS sequences")
queryEvaluator.execute(FORWARDINGS_DDL)
queryEvaluator.execute(SEQUENCE_DDL)
queryEvaluator.execute("INSERT INTO " + tablePrefix + "_sequence VALUES (0)")
// queryEvaluator.execute("INSERT INTO_sequences VALUES (0)")
}
}
4 changes: 4 additions & 0 deletions src/main/scala/com/twitter/gizzard/shards/ShardFactory.scala
Expand Up @@ -5,3 +5,7 @@ trait ShardFactory[S <: Shard] {
def instantiate(shardInfo: ShardInfo, weight: Int, children: Seq[Shard]): S
def materialize(shardInfo: ShardInfo)
}

trait ShardMaterializer {
def materialize(shardInfo: ShardInfo)
}
Expand Up @@ -78,8 +78,8 @@ class ShardManagerService(nameServer: CachingNameServer/*, copyManager: CopyMana
nameServer.replaceForwarding(oldShardId, newShardId)
}

def get_forwarding(tableId: java.util.List[java.lang.Integer], baseId: Long) = {
nameServer.getForwarding(tableId.toList, baseId).toThrift
def get_forwarding(tableId: Int, baseId: Long) = {
nameServer.getForwarding(tableId, baseId).toThrift
}

def get_forwarding_for_shard(shardId: Int) = {
Expand All @@ -95,8 +95,8 @@ class ShardManagerService(nameServer: CachingNameServer/*, copyManager: CopyMana
nameServer.reload()
}

def find_current_forwarding(tableId: java.util.List[java.lang.Integer], id: Long) = {
nameServer.findCurrentForwarding(tableId.toList, id).toThrift
def find_current_forwarding(tableId: Int, id: Long) = {
nameServer.findCurrentForwarding(tableId, id).toThrift
}

def shard_ids_for_hostname(hostname: String, className: String): java.util.List[java.lang.Integer] = {
Expand Down
Expand Up @@ -5,12 +5,12 @@ import com.twitter.gizzard.thrift.conversions.Sequences._

object Forwarding {
class RichShardingForwarding(forwarding: nameserver.Forwarding) {
def toThrift = new thrift.Forwarding(forwarding.tableId.toJavaList, forwarding.baseId, forwarding.shardId)
def toThrift = new thrift.Forwarding(forwarding.serviceId, forwarding.tableId, forwarding.baseId, forwarding.shardId)
}
implicit def shardingForwardingToRichShardingForwarding(forwarding: nameserver.Forwarding) = new RichShardingForwarding(forwarding)

class RichThriftForwarding(forwarding: thrift.Forwarding) {
def fromThrift = new nameserver.Forwarding(forwarding.table_id.toList, forwarding.base_id, forwarding.shard_id)
def fromThrift = new nameserver.Forwarding(forwarding.service_id, forwarding.table_id, forwarding.base_id, forwarding.shard_id)
}
implicit def thriftForwardingToRichThriftForwarding(forwarding: thrift.Forwarding) = new RichThriftForwarding(forwarding)
}
11 changes: 6 additions & 5 deletions src/main/thrift/ShardManager.thrift
Expand Up @@ -28,9 +28,10 @@ struct ShardMigration {
}

struct Forwarding {
1: list<i32> table_id
2: i64 base_id
3: i32 shard_id
1: i32 service_id
2: i32 table_id
3: i64 base_id
4: i32 shard_id
}

service ShardManager {
Expand All @@ -53,11 +54,11 @@ service ShardManager {

void set_forwarding(1: Forwarding forwarding) throws(ShardException ex)
void replace_forwarding(1: i32 old_shard_id, 2: i32 new_shard_id) throws(ShardException ex)
ShardInfo get_forwarding(1: list<i32> table_id, 2: i64 base_id) throws(ShardException ex)
ShardInfo get_forwarding(1: i32 table_id, 2: i64 base_id) throws(ShardException ex)
Forwarding get_forwarding_for_shard(1: i32 shard_id) throws(ShardException ex)
list<Forwarding> get_forwardings() throws(ShardException ex)
void reload_forwardings() throws(ShardException ex)
ShardInfo find_current_forwarding(1: list<i32> table_id, 2: i64 id) throws(ShardException ex)
ShardInfo find_current_forwarding(1: i32 table_id, 2: i64 id) throws(ShardException ex)

list<i32> shard_ids_for_hostname(1: string hostname, 2: string class_name) throws(ShardException ex)
list<ShardInfo> shards_for_hostname(1: string hostname, 2: string class_name) throws(ShardException ex)
Expand Down
Expand Up @@ -36,7 +36,7 @@ object NameServerSpec extends Specification with JMocker with ClassMocker with D
}

shardRepository = mock[ShardRepository[Shard]]
nameServer = new SqlNameServer(queryEvaluator, "test")
nameServer = new SqlNameServer(queryEvaluator, (a: ShardInfo) => ())

// nameServer.reload()

Expand Down Expand Up @@ -178,7 +178,8 @@ object NameServerSpec extends Specification with JMocker with ClassMocker with D
}

shardId = nameServer.createShard(forwardShardInfo)
forwarding = new Forwarding(List(1, 2), 0L, shardId)
// forwarding = new Forwarding(0, List(1, 2), 0L, shardId)
forwarding = new Forwarding(0, 1, 0L, shardId)
}

"setForwarding" in {
Expand All @@ -195,7 +196,7 @@ object NameServerSpec extends Specification with JMocker with ClassMocker with D

"getForwarding" in {
nameServer.setForwarding(forwarding)
nameServer.getForwarding(List(1, 2), 0L).shardId mustEqual shardId
nameServer.getForwarding(4, 0L).shardId mustEqual shardId
}

"getForwardingForShard" in {
Expand Down
Expand Up @@ -11,7 +11,7 @@ import nameserver.{ManagingNameServer, SqlNameServer, ShardRepository}

object ShardsIntegrationSpec extends Specification with JMocker with ClassMocker with Database {

SqlNameServer.rebuildSchema(queryEvaluator)
// SqlNameServer.rebuildSchema(queryEvaluator)

val shardInfo1 = new ShardInfo("com.example.UserShard", "table1", "localhost")
val shardInfo2 = new ShardInfo("com.example.UserShard", "table2", "localhost")
Expand Down Expand Up @@ -44,7 +44,7 @@ object ShardsIntegrationSpec extends Specification with JMocker with ClassMocker
shardRepository = new ShardRepository
shardRepository += (("com.example.UserShard", factory))
shardRepository += (("com.example.SqlShard", factory))
nameServer = new SqlNameServer(queryEvaluator, "test")
nameServer = new SqlNameServer(queryEvaluator, (a: ShardInfo) => ())

nameServer.createShard(shardInfo1)
nameServer.createShard(shardInfo2)
Expand Down

0 comments on commit 9339a5d

Please sign in to comment.