Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Abstracted persistence.

  • Loading branch information...
commit 8d18292d301544e6aa98e7cb15679e1df6bffccf 1 parent a38a616
@dustin dustin authored
Showing with 65 additions and 17 deletions.
  1. +54 −0 data.go
  2. +3 −5 report.go
  3. +2 −4 stream.go
  4. +6 −8 updater.go
View
54 data.go
@@ -0,0 +1,54 @@
+package main
+
+import (
+ "github.com/dustin/gomemcached"
+ "github.com/dustin/gomemcached/client"
+)
+
+type persister interface {
+ CAS(k string, f memcached.CasFunc,
+ initexp int) (rv *gomemcached.MCResponse, err error)
+ Incr(key string,
+ amt, def uint64, exp int) (uint64, error)
+ Get(key string) (*gomemcached.MCResponse, error)
+ GetBulk(keys []string) (map[string]*gomemcached.MCResponse, error)
+
+ Close() error
+}
+
+type mcAdaptor struct {
+ mc *memcached.Client
+}
+
+func (m mcAdaptor) CAS(k string, f memcached.CasFunc,
+ initexp int) (rv *gomemcached.MCResponse, err error) {
+
+ return m.mc.CAS(0, k, f, initexp)
+}
+
+func (m mcAdaptor) Incr(key string,
+ amt, def uint64, exp int) (uint64, error) {
+
+ return m.mc.Incr(0, key, amt, def, exp)
+}
+
+func (m mcAdaptor) Get(key string) (*gomemcached.MCResponse, error) {
+ return m.mc.Get(0, key)
+}
+
+func (m mcAdaptor) GetBulk(keys []string) (map[string]*gomemcached.MCResponse, error) {
+ return m.mc.GetBulk(0, keys)
+}
+
+func (m mcAdaptor) Close() error {
+ m.mc.Close()
+ return nil
+}
+
+func getPersister(u string) (persister, error) {
+ client, err := memcached.Connect("tcp", u)
+ if err != nil {
+ return nil, err
+ }
+ return &mcAdaptor{client}, nil
+}
View
8 report.go
@@ -7,21 +7,19 @@ import (
"os"
"sort"
"text/tabwriter"
-
- "github.com/dustin/gomemcached/client"
)
func report(prefix string) {
log.Printf("Reporting on %v", prefix)
- client, err := memcached.Connect("tcp", *mcServer)
+ client, err := getPersister(*mcServer)
if err != nil {
- log.Printf("Error connecting to memcached: %v", err)
+ log.Printf("Error connecting to persister: %v", err)
return
}
defer client.Close()
- resp, err := client.Get(0, lPrefix+prefix)
+ resp, err := client.Get(lPrefix + prefix)
if err != nil {
log.Printf("Error reporting on %s: %v", prefix, err)
return
View
6 stream.go
@@ -11,8 +11,6 @@ import (
"os"
"strings"
"time"
-
- "github.com/dustin/gomemcached/client"
)
type Tweet struct {
@@ -45,9 +43,9 @@ func parseNext(d *json.Decoder) (rv Tweet, err error) {
}
func handle(ch <-chan Tweet, pch <-chan string) {
- client, err := memcached.Connect("tcp", *mcServer)
+ client, err := getPersister(*mcServer)
if err != nil {
- log.Fatalf("Error connecting to memcached: %v", err)
+ log.Fatalf("Error getting persister: %v", err)
}
defer client.Close()
prefix := ""
View
14 updater.go
@@ -7,8 +7,6 @@ import (
"strconv"
"strings"
"unicode"
-
- "github.com/dustin/gomemcached/client"
)
func removeBottom(r *ranks) {
@@ -70,8 +68,8 @@ func makeRanks(candidates map[string]uint64) ranks {
return rv
}
-func updateList(client *memcached.Client, prefix string, totals map[string]uint64) {
- _, err := client.CAS(0, lPrefix+prefix,
+func updateList(client persister, prefix string, totals map[string]uint64) {
+ _, err := client.CAS(lPrefix+prefix,
func(oldBody []byte) []byte {
if len(oldBody) == 0 {
b, err := json.Marshal(totals)
@@ -96,7 +94,7 @@ func updateList(client *memcached.Client, prefix string, totals map[string]uint6
keys = append(keys, cPrefix+prefix+k)
}
- m, err := client.GetBulk(0, keys)
+ m, err := client.GetBulk(keys)
if err != nil {
log.Fatalf("Error getting %v: %v", keys, err)
}
@@ -125,7 +123,7 @@ func updateList(client *memcached.Client, prefix string, totals map[string]uint6
}
}
-func process(client *memcached.Client, prefix string, t *Tweet) {
+func process(client persister, prefix string, t *Tweet) {
parts := strings.FieldsFunc(strings.ToLower(t.Text), func(r rune) bool {
return !(unicode.IsLetter(r) || r == '\'')
})
@@ -139,13 +137,13 @@ func process(client *memcached.Client, prefix string, t *Tweet) {
for w, count := range counts {
k := cPrefix + prefix + w
var err error
- totals[w], err = client.Incr(0, k, count, 1,
+ totals[w], err = client.Incr(k, count, 1,
10*int(windowSize.Seconds()))
if err != nil {
log.Fatalf("Error incrementing %v: %v", k, err)
}
if *foreverWords {
- _, err = client.Incr(0, cPrefix+w, count, 1, 0)
+ _, err = client.Incr(cPrefix+w, count, 1, 0)
}
}
updateList(client, prefix, totals)
Please sign in to comment.
Something went wrong with that request. Please try again.