Skip to content
Permalink
Browse files

mcast: support 50/50 r/w; replica ack: write-through and write-back(*…

…); start writing time fix
  • Loading branch information...
alex-aizman committed May 14, 2016
1 parent 4b8486e commit 1ae138883ade64601782fe494edc8eda4756f363
Showing with 91 additions and 41 deletions.
  1. +10 −7 config.go
  2. +0 −1 event.go
  3. +6 −1 log.go
  4. +2 −10 m5.go
  5. +19 −4 m7.go
  6. +41 −13 ma.go
  7. +13 −5 node.go
@@ -71,7 +71,7 @@ type ConfigStorage struct {
sizeDataChunk, sizeMetaChunk int
diskMBps int
maxDiskQueue int
chunksInFlight int // TODO: UCH-* models to start next chunk without waiting for ACK..
read bool // false => 100% write | true => 50% read, 50% write
// derived from other config, for convenience
dskdurationDataChunk time.Duration
dskdurationFrame time.Duration
@@ -80,12 +80,12 @@ type ConfigStorage struct {
}

var configStorage = ConfigStorage{
numReplicas: 3,
sizeDataChunk: 128, // KB
sizeMetaChunk: 1, // KB
diskMBps: 400, // MB/sec
maxDiskQueue: 256, // KB
chunksInFlight: 1, // TODO: limits total in-flight for a given gateway
numReplicas: 3,
sizeDataChunk: 128, // KB
sizeMetaChunk: 1, // KB
diskMBps: 400, // MB/sec
maxDiskQueue: 256, // KB
read: false,
}

//
@@ -186,6 +186,8 @@ func init() {
chunksizePtr := flag.Int("chunksize", configStorage.sizeDataChunk, "chunk size (KB)")
diskthPtr := flag.Int("diskthroughput", configStorage.diskMBps, "disk throughput (MB/sec)")

readPtr := flag.Bool("r", configStorage.read, "read=false(100% write) | true(50/50% read/write)")

diskQueuePtr := flag.Int("diskqueue", configStorage.maxDiskQueue, "disk queue size (KB)")

l2framePtr := flag.Int("l2frame", configNetwork.sizeFrame, "L2 frame size (bytes)")
@@ -237,6 +239,7 @@ func init() {
configStorage.numReplicas = *replicasPtr
configStorage.sizeDataChunk = *chunksizePtr
configStorage.diskMBps = *diskthPtr
configStorage.read = *readPtr
configStorage.maxDiskQueue = *diskQueuePtr

configNetwork.sizeFrame = *l2framePtr
@@ -281,7 +281,6 @@ func newReplicaPutAckEvent(srv RunnerInterface, gwy RunnerInterface, flow *Flow,
at := atnet + atdisk
timedev := newTimedAnyEvent(srv, at, gwy, tio, configNetwork.sizeControlPDU)
assert(flow.cid == tio.cid)
assert(flow.repnum == tio.repnum)
return &ReplicaPutAckEvent{zControlEvent{zEvent{*timedev}, flow.cid}, flow.repnum}
}

7 log.go
@@ -53,9 +53,14 @@ func nameLog(n string) {
x /= 1024
s = "M"
}
config.LogFile = "/tmp/" + fmt.Sprintf("log-m%s-%dx%d-%v-%d%s-%dMs-%s.csv", n,
read := ""
if configStorage.read {
read = "read50pct-"
}
config.LogFile = "/tmp/" + fmt.Sprintf("log-m%s-%dx%d-%v-%d%s-%dMs-%s%s.csv", n,
config.numGateways, config.numServers, config.timeToRun,
x, s, configStorage.diskMBps,
read,
build)
}

12 m5.go
@@ -28,10 +28,6 @@
// The UCH-CCPi pipeline includes 3 control events per each replica of
// each chunk, details in the code.
//
// TODO: configStorage.chunksInFlight
// add support for multiple chunks in-flight, with gateways starting
// to transmit without waiting for completions
//
package surge

import (
@@ -124,12 +120,8 @@ func (r *gatewayFive) Run() {
lastRefill := Now
for r.state == RstateRunning {
if r.chunk == nil {
// the gateway does currently one chunk at a time;
// configStorage.chunksInFlight > 1 is not supported yet
//
// if there no chunk in flight (r.chunk == nil)
// we must make sure the gateway's rate bucket has
// at least sizeControlPDU bits to send the new PUT..
// make sure the gateway's rate bucket has
// at least sizeControlPDU bits to send
if r.rb.above(int64(configNetwork.sizeControlPDU * 8)) {
r.startNewChunk()
}
23 m7.go
@@ -234,12 +234,21 @@ func (r *gatewaySeven) M7receivebid(ev EventInterface) error {
mcastflow := tioparent.flow
mcastflow.extension = computedbid

// to read or not to read? round robin between selected servers, if 50% reading requested via CLI
read_k := -1
if configStorage.read {
read_k = int(r.chunk.cid % int64(configStorage.numReplicas))
}
// fill in the multicast rendezvous group for chunk data transfer
// note that pending[] bids at these point are already "filtered"
// to contain only those bids that were selected
ids := make([]int, configStorage.numReplicas)
for k := 0; k < configStorage.numReplicas; k++ {
bid := r.bids.pending[k]
if k == read_k {
bid.tio.repnum = 50 // FIXME
log("read-pre", bid.tio.String())
}
assert(ngobj.hasmember(bid.tio.target))
ids[k] = bid.tio.target.GetID()

@@ -401,12 +410,18 @@ func (r *serverSeven) Run() {

log(LogVV, "SRV::rxcallback: chunk data", tioevent.String(), bid.String())
// once the entire chunk is received:
// 1) generate ReplicaPutAckEvent inside the common receiveReplicaData()
// 2) cleanup the corresponding accepted bids without waiting for them
// to self-expire
//
// 1) push it into the disk's local queue (receiveReplicaData)
// 2) generate ReplicaPutAckEvent (receiveReplicaData)
// 3) delete the bid from the local queue without waiting for it to self-expire
// 4) simulate 50% reading if requested via the bid itself
if r.receiveReplicaData(tioevent) == ReplicaDone {
r.bids.deleteBid(k)
// read
if bid.tio.repnum == 50 {
r.disk.lastIOdone = r.disk.lastIOdone.Add(configStorage.dskdurationDataChunk)
r.addBusyDuration(configStorage.sizeDataChunk*1024, configStorage.diskbps, DiskBusy)
log("read", bid.tio.String(), fmt.Sprintf("%-12.10v", r.disk.lastIOdone.Sub(time.Time{})))
}
}
default:
tio := ev.GetTio()
54 ma.go
@@ -294,12 +294,21 @@ func (r *serverSevenX) rxcallback(ev EventInterface) int {

log(LogVV, "SRV::rxcallback:chunk-data", tioevent.String(), bid.String())
// once the entire chunk is received:
// 1) insert it into the local disk's queue (see receiveReplicaData)
// 2) and generate ReplicaPutAckEvent (see receiveReplicaData)
// 1) push it into the disk's local queue (receiveReplicaData)
// 2) generate ReplicaPutAckEvent (receiveReplicaData)
// 3) delete the bid from the local queue
// 4) notify the proxy, to cleanup its local respective allQbids queue
// 4) simulate 50% reading if requested via the bid itself
// 5) finally, notify the proxy, to cleanup its local respective allQbids queue
if r.receiveReplicaData(tioevent) == ReplicaDone {
r.bids.deleteBid(k)
// read
if bid.win.right.Sub(bid.win.left) > configReplicast.durationBidWindow {
tio := ev.GetTio()
r.disk.lastIOdone = r.disk.lastIOdone.Add(configStorage.dskdurationDataChunk)
r.addBusyDuration(configStorage.sizeDataChunk*1024, configStorage.diskbps, DiskBusy)
log("read", tio.String(), fmt.Sprintf("%-12.10v", r.disk.lastIOdone.Sub(time.Time{})))
}

r.notifyProxyBidDone(bid)
}
case *ProxyBidsEvent:
@@ -343,6 +352,8 @@ func (r *serverSevenX) notifyProxyBidDone(bid *PutBid) {
assert(q0.r.GetID() == proxy.GetID())
assert(bid == q0.pending[0])
q0.deleteBid(0)

proxy.update_reservedIOdone(0, r.disk.lastIOdone)
return
}
// resolve proxy
@@ -446,14 +457,20 @@ func (r *serverSevenProxy) M7X_request(ev EventInterface) error {
// FIXME: debug, remove
r.logDebug(tioevent, bestqueues[0:], newleft, cstr)

// 4. - create bids for those selected
// 4. - to read or not to read? round robin between selected servers, if 50% reading requested via CLI
// - create bids for those selected
// - accept the bids right away
// - and send each of these new bids into their corresponding targets
bidsev := newProxyBidsEvent(r, gwy, ngobj, tioevent.cid, tioproxy, configStorage.numReplicas, tioevent.sizeb)
read_j := -1
if configStorage.read {
read_j = int(tioproxy.cid % int64(configStorage.numReplicas))
}
chkLatency := time.Duration(0)
for j := 0; j < configStorage.numReplicas; j++ {
bqj := bestqueues[j]
newbid, repLatency := r.autoAcceptOnBehalf(tioevent, bqj, newleft, ngobj)
isReader := (read_j == j)
newbid, repLatency := r.autoAcceptOnBehalf(tioevent, bqj, newleft, ngobj, isReader)
log("proxy-bid", newbid.String(), "ack-estimated", fmt.Sprintf("%-12.10v", r.reservedIOdone[bqj].Sub(time.Time{})), repLatency)

if repLatency > chkLatency {
@@ -542,7 +559,7 @@ func (r *serverSevenProxy) logDebug(tioevent *McastChunkPutRequestEvent, bestque
}
}

func (r *serverSevenProxy) handleBidDone(bid *PutBid, reservedIOdone time.Time) {
func (r *serverSevenProxy) handleBidDone(bid *PutBid, reservedIOdone_fromServer time.Time) {
srv := bid.tio.target
i := 1
for ; i < configReplicast.sizeNgtGroup; i++ {
@@ -565,9 +582,10 @@ func (r *serverSevenProxy) handleBidDone(bid *PutBid, reservedIOdone time.Time)
break
}
assert(i < configReplicast.sizeNgtGroup, srv.String())

// recompute based on the updated qi
r.update_reservedIOdone(i, reservedIOdone)
//
// recompute the server's disk queue that is tracked by the proxy via reservedIOdone[]
//
r.update_reservedIOdone(i, reservedIOdone_fromServer)
}

func (r *serverSevenProxy) update_reservedIOdone(i int, reservedIOdone_fromServer time.Time) {
@@ -682,14 +700,18 @@ func (r *serverSevenProxy) estimateSrvTimes(q *ServerSparseBidQueue, reservedIOd
}

// start writing immediately if the new chunk gets delivered *after*
// the server will have finished already queued writes:
// stime = ntime + delay
// the server will have finished already queued writes,
// otherwise finish them off first..
delay := diskdelay(ntime, reservedIOdone)
stime = ntime.Add(delay)
stime = ntime
if stime.Before(reservedIOdone) {
stime = reservedIOdone
}
stime = stime.Add(delay)
return newleft, stime, delay
}

func (r *serverSevenProxy) autoAcceptOnBehalf(tioevent *McastChunkPutRequestEvent, bqj int, newleft time.Time, ngobj GroupInterface) (*PutBid, time.Duration) {
func (r *serverSevenProxy) autoAcceptOnBehalf(tioevent *McastChunkPutRequestEvent, bqj int, newleft time.Time, ngobj GroupInterface, isReader bool) (*PutBid, time.Duration) {
qj := r.allQbids[bqj]
dj := r.reservedIOdone[bqj]
srv := qj.r
@@ -700,6 +722,12 @@ func (r *serverSevenProxy) autoAcceptOnBehalf(tioevent *McastChunkPutRequestEven

newbid := NewPutBid(tiochild, newleft)
newbid.state = bidStateAccepted

// simulate reading
if isReader {
newbid.win.right = newbid.win.right.Add(config.timeClusterTrip * 2)
}

tiochild.bid = newbid
qj.insertBid(newbid)

18 node.go
@@ -467,10 +467,15 @@ func (r *ServerUch) receiveReplicaData(ev *ReplicaDataEvent) int {
if flow.offset < flow.totalbytes {
return ReplicaNotDoneYet
}
//
// postpone the ack until after the replica (chunk.sizeb) is written to disk
//
// persistence policies (important!):
// 1) write-through: always postpone the ack until after the replica is fully written
// 2) write-back(*): ack immediately if the new replica fits within the disk buffer space;
// otherwise delay
atdisk := r.disk.scheduleWrite(flow.totalbytes)
if atdisk < configStorage.dskdurationDataChunk*time.Duration(configStorage.maxDiskQueueChunks) {
atdisk = 0
}

r.addBusyDuration(flow.totalbytes, configStorage.diskbps, DiskBusy)

tio := ev.GetTio()
@@ -487,8 +492,11 @@ func (r *ServerUch) receiveReplicaData(ev *ReplicaDataEvent) int {
} else {
cstr = fmt.Sprintf("c#%d", flow.sid)
}
// gwyacktime := fmt.Sprintf("%-12.10v", putackev.GetTriggerTime().Sub(time.Time{}))
log("srv-replica-received", r.String(), cstr, "replica-ack-scheduled", fmt.Sprintf("%-12.10v", r.disk.lastIOdone.Sub(time.Time{})), "atd", atdisk)
if atdisk > 0 {
log("rep-received-ack-delayed", r.String(), cstr, atdisk)
} else {
log("rep-received", r.String(), cstr, atdisk)
}
r.flowsfrom.deleteFlow(gwy)
return ReplicaDone
}

0 comments on commit 1ae1388

Please sign in to comment.
You can’t perform that action at this time.