Permalink
Browse files

Moved a bunch of params into global config.

  • Loading branch information...
dustin committed Sep 20, 2012
1 parent 5d92375 commit c327958d704c9492ac2c4d794e70ac4e08adcb19
Showing with 105 additions and 42 deletions.
  1. +53 −0 config/config.go
  2. +24 −0 config/config_test.go
  3. +1 −1 hash.go
  4. +1 −1 hash_test.go
  5. +15 −32 heartbeat.go
  6. +11 −8 main.go
View
@@ -0,0 +1,53 @@
+package cbfsconfig
+
+import (
+ "time"
+
+ "github.com/couchbaselabs/go-couchbase"
+)
+
+const dbKey = "/@globalConfig"
+
+// Cluster-wide configuration
+type CBFSConfig struct {
+ // Frequency of Object GC Process
+ GCFreq time.Duration `json:"gcfreq"`
+ // Hash algorithm to use
+ Hash string `json:"hash"`
+ // Expected heartbeat frequency
+ HeartbeatFreq time.Duration `json:"hbfreq"`
+ // Minimum number of replicas to try to keep
+ MinReplicas int `json:"minrepl"`
+ // Number of blobs to remove from a stale node per period
+ NodeCleanCount int `json:"cleanCount"`
+ // Reconciliation frequency
+ ReconcileFreq time.Duration `json:"reconcileFreq"`
+ // How often to check for stale nodes
+ StaleNodeCheckFreq time.Duration `json:"nodeCheckFreq"`
+ // Time since the last heartbeat at which we consider a node stale
+ StaleNodeLimit time.Duration `json:"staleLimit"`
+}
+
+// Get the default configuration
+func DefaultConfig() CBFSConfig {
+ return CBFSConfig{
+ GCFreq: time.Minute * 5,
+ Hash: "sha1",
+ HeartbeatFreq: time.Second * 5,
+ MinReplicas: 3,
+ NodeCleanCount: 1000,
+ ReconcileFreq: time.Hour * 24,
+ StaleNodeCheckFreq: time.Minute,
+ StaleNodeLimit: time.Minute * 10,
+ }
+}
+
+// Update this config within a bucket.
+func (conf CBFSConfig) StoreConfig(db *couchbase.Bucket) error {
+ return db.Set(dbKey, &conf)
+}
+
+// Update this config from the db.
+func (conf *CBFSConfig) RetrieveConfig(db *couchbase.Bucket) error {
+ return db.Get(dbKey, conf)
+}
View
@@ -0,0 +1,24 @@
+package cbfsconfig
+
+import (
+ "encoding/json"
+ "reflect"
+ "testing"
+)
+
+func TestJSONRoundTrip(t *testing.T) {
+ conf := DefaultConfig()
+ d, err := json.Marshal(&conf)
+ if err != nil {
+ t.Fatalf("Error marshaling config: %v", err)
+ }
+
+ conf2 := CBFSConfig{}
+ err = json.Unmarshal(d, &conf2)
+ if err != nil {
+ t.Fatalf("Error unmarshalling: %v", err)
+ }
+ if !reflect.DeepEqual(conf, conf2) {
+ t.Fatalf("Unmarshalled value is different:\n%v\n%v", conf, conf2)
+ }
+}
View
@@ -23,7 +23,7 @@ var hashBuilders = map[string]func() hash.Hash{
}
func getHash() hash.Hash {
- h, ok := hashBuilders[*hashType]
+ h, ok := hashBuilders[globalConfig.Hash]
if !ok {
return nil
}
View
@@ -43,7 +43,7 @@ func initData() {
func benchHash(h string, b *testing.B) {
once.Do(initData)
- *hashType = h
+ globalConfig.Hash = h
b.SetBytes(int64(len(randomData)))
for i := 0; i < b.N; i++ {
sh := getHash()
View
@@ -20,49 +20,33 @@ import (
"github.com/dustin/gomemcached/client"
)
-var heartFreq = flag.Duration("heartbeat", 10*time.Second,
- "Heartbeat frequency")
-var reconcileFreq = flag.Duration("reconcile", 24*time.Hour,
- "Reconciliation frequency")
-var staleNodeFreq = flag.Duration("staleNodeCheck", 5*time.Minute,
- "How frequently to check for stale nodes.")
-var staleNodeLimit = flag.Duration("staleNodeLimit", 15*time.Minute,
- "How long until we clean up nodes for being too stale")
-var nodeCleanCount = flag.Int("nodeCleanCount", 1000,
- "How many blobs to clean up from a dead node per period")
var verifyWorkers = flag.Int("verifyWorkers", 4,
"Number of object verification workers.")
-var garbageCollectFreq = flag.Duration("gcFreq", 5*time.Minute,
- "How frequently to check dangling blobs.")
var maxStartupObjects = flag.Int("maxStartObjs", 1000,
"Maximum number of objects to pull on start")
var maxStartupRepls = flag.Int("maxStartRepls", 3,
"Blob replication limit for startup objects.")
-var minReplicas = flag.Int("minReplicas", 3,
- "Minimum number of replicas to try to keep")
type PeriodicJob struct {
- period time.Duration
+ period func() time.Duration
f func() error
}
var periodicJobs = map[string]*PeriodicJob{
"checkStaleNodes": &PeriodicJob{
- time.Minute * 5,
+ func() time.Duration {
+ return globalConfig.StaleNodeCheckFreq
+ },
checkStaleNodes,
},
"garbageCollectBlobs": &PeriodicJob{
- time.Minute * 5,
+ func() time.Duration {
+ return globalConfig.GCFreq
+ },
garbageCollectBlobs,
},
}
-func adjustPeriodicJobs() error {
- periodicJobs["checkStaleNodes"].period = *staleNodeFreq
- periodicJobs["garbageCollectBlobs"].period = *garbageCollectFreq
- return nil
-}
-
type JobMarker struct {
Node string `json:"node"`
Started time.Time `json:"started"`
@@ -117,14 +101,13 @@ func heartbeat() {
Type: "node",
Time: time.Now().UTC(),
BindAddr: *bindAddr,
- Hash: *hashType,
}
err = couchbase.Set("/"+serverId, aboutMe)
if err != nil {
log.Printf("Failed to record a heartbeat: %v", err)
}
- time.Sleep(*heartFreq)
+ time.Sleep(globalConfig.HeartbeatFreq)
}
}
@@ -192,7 +175,7 @@ func reconcile() error {
}
func reconcileLoop() {
- if *reconcileFreq == 0 {
+ if globalConfig.ReconcileFreq == 0 {
return
}
for {
@@ -201,7 +184,7 @@ func reconcileLoop() {
log.Printf("Error in reconciliation loop: %v", err)
}
grabSomeData()
- time.Sleep(*reconcileFreq)
+ time.Sleep(globalConfig.ReconcileFreq)
}
}
@@ -293,7 +276,7 @@ func cleanupNode(node string) {
vres, err := couchbase.View("cbfs", "node_blobs",
map[string]interface{}{
"key": `"` + node + `"`,
- "limit": *nodeCleanCount,
+ "limit": globalConfig.NodeCleanCount,
"reduce": false,
"stale": false,
})
@@ -306,7 +289,7 @@ func cleanupNode(node string) {
numOwners := removeBlobOwnershipRecord(r.ID[1:], node)
foundRows++
- if numOwners < *minReplicas {
+ if numOwners < globalConfig.MinReplicas {
salvageBlob(r.ID[1:], node, nodes)
}
}
@@ -333,7 +316,7 @@ func checkStaleNodes() error {
for _, node := range nl {
d := time.Since(node.Time)
- if d > *staleNodeLimit {
+ if d > globalConfig.StaleNodeLimit {
if node.IsLocal() {
log.Printf("Would've cleaned up myself after %v",
d)
@@ -530,12 +513,12 @@ func grabSomeData() {
func runPeriodicJob(name string, job *PeriodicJob) {
time.Sleep(time.Second * time.Duration(5+rand.Intn(60)))
for {
- if runNamedGlobalTask(name, job.period, job.f) {
+ if runNamedGlobalTask(name, job.period(), job.f) {
log.Printf("Attempted job %v", name)
} else {
log.Printf("Didn't run job %v", name)
}
- time.Sleep(job.period + time.Second)
+ time.Sleep(job.period() + time.Second)
}
}
View
19 main.go
@@ -9,18 +9,21 @@ import (
"net/http"
"os"
"time"
+
+ "github.com/couchbaselabs/cbfs/config"
)
var bindAddr = flag.String("bind", ":8484", "Address to bind web thing to")
var root = flag.String("root", "storage", "Storage location")
-var hashType = flag.String("hash", "sha1", "Hash to use")
var couchbaseServer = flag.String("couchbase", "", "Couchbase URL")
var couchbaseBucket = flag.String("bucket", "default", "Couchbase bucket")
var cachePercentage = flag.Int("cachePercent", 100,
"Percentage of proxied requests to eagerly cache.")
var enableViewProxy = flag.Bool("viewProxy", false,
"Enable the view proxy")
+var globalConfig = cbfsconfig.DefaultConfig()
+
type prevMeta struct {
Headers http.Header `json:"headers"`
OID string `json:"oid"`
@@ -110,19 +113,14 @@ func main() {
if getHash() == nil {
fmt.Fprintf(os.Stderr,
"Unsupported hash specified: %v. Supported hashes:\n",
- *hashType)
+ globalConfig.Hash)
for h := range hashBuilders {
fmt.Fprintf(os.Stderr, " * %v\n", h)
}
os.Exit(1)
}
- err := adjustPeriodicJobs()
- if err != nil {
- log.Fatalf("Error adjusting periodic jobs from flags: %v", err)
- }
-
- err = initServerId()
+ err := initServerId()
if err != nil {
log.Fatalf("Error initializing server ID: %v", err)
}
@@ -132,6 +130,11 @@ func main() {
log.Fatalf("Can't connect to couchbase: %v", err)
}
+ err = globalConfig.RetrieveConfig(couchbase)
+ if err != nil {
+ log.Printf("Error retrieving global config: %v", err)
+ }
+
go heartbeat()
go reconcileLoop()
go runPeriodicJobs()

0 comments on commit c327958

Please sign in to comment.