Skip to content

Commit

Permalink
Add crawler interval
Browse files Browse the repository at this point in the history
  • Loading branch information
khlieng committed Jul 5, 2015
1 parent f31eb2e commit 3c14c44
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 11 deletions.
4 changes: 3 additions & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net/http"
"path"
"strconv"
"time"

"github.com/Castcloud/castcloud-go-server/Godeps/_workspace/src/github.com/labstack/echo"
mw "github.com/Castcloud/castcloud-go-server/Godeps/_workspace/src/github.com/labstack/echo/middleware"
Expand All @@ -24,6 +25,7 @@ type Config struct {
Dir string
Debug bool

CrawlInterval time.Duration
MaxDownloadConnections int
}

Expand All @@ -40,7 +42,7 @@ func Serve() {
openStore(path.Join(config.Dir, "store"))
authCache = newMemAuthCache()

crawl = newCrawler()
crawl = newCrawler(config.CrawlInterval)
crawl.start(config.MaxDownloadConnections)

if user := store.GetUser("test"); user == nil {
Expand Down
3 changes: 2 additions & 1 deletion api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"
"os"
"testing"
"time"

. "github.com/Castcloud/castcloud-go-server/api/schema"
)
Expand All @@ -32,7 +33,7 @@ func TestMain(m *testing.M) {
testRSS = testServer.URL + "/rss"
testAtom = testServer.URL + "/atom"

crawl = newCrawler()
crawl = newCrawler(time.Hour)
crawl.start(4)

os.Exit(m.Run())
Expand Down
2 changes: 1 addition & 1 deletion api/casts.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
//
func getCasts(c *echo.Context) error {
user := c.Get("user").(*User)
return c.JSON(200, store.GetCasts(user.Subscriptions))
return c.JSON(200, store.GetCastsByID(user.Subscriptions))
}

//
Expand Down
30 changes: 29 additions & 1 deletion api/crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ const (
)

type crawler struct {
interval time.Duration
fetching chan fetchJob
saving chan saveJob
quit chan struct{}
}

type fetchJob struct {
Expand All @@ -31,10 +33,12 @@ type saveJob struct {
result chan *Cast
}

func newCrawler() *crawler {
func newCrawler(interval time.Duration) *crawler {
return &crawler{
interval: interval,
fetching: make(chan fetchJob, 4096),
saving: make(chan saveJob, 256),
quit: make(chan struct{}),
}
}

Expand All @@ -46,9 +50,12 @@ func (c *crawler) start(maxConn int) {
for i := 0; i < 64; i++ {
go c.saver()
}

go c.run()
}

func (c *crawler) stop() {
close(c.quit)
close(c.fetching)
close(c.saving)
}
Expand All @@ -59,6 +66,27 @@ func (c *crawler) fetch(url string) chan *Cast {
return resultCh
}

func (c *crawler) run() {
tick := time.Tick(c.interval)

c.crawlCasts()
for {
select {
case <-c.quit:
return

case <-tick:
c.crawlCasts()
}
}
}

func (c *crawler) crawlCasts() {
for _, cast := range store.GetCasts() {
c.fetch(cast.URL)
}
}

func (c *crawler) fetcher() {
for {
job, ok := <-c.fetching
Expand Down
3 changes: 2 additions & 1 deletion api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ type APIStore interface {
RemoveSubscription(userid, castid uint64) (*User, error)

GetCast(id uint64) *Cast
GetCasts(ids []uint64) []Cast
GetCasts() []Cast
GetCastsByID(ids []uint64) []Cast
GetCastByURL(url string) *Cast
SaveCast(cast *Cast) error

Expand Down
19 changes: 18 additions & 1 deletion api/store_bolt_casts.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,24 @@ func (s *BoltStore) GetCast(id uint64) *Cast {
return cast
}

func (s *BoltStore) GetCasts(ids []uint64) []Cast {
func (s *BoltStore) GetCasts() []Cast {
casts := []Cast{}
cast := &Cast{}

s.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(boltBucketCasts)

return b.ForEach(func(k, v []byte) error {
cast.UnmarshalMsg(v)
casts = append(casts, *cast)
return nil
})
})

return casts
}

func (s *BoltStore) GetCastsByID(ids []uint64) []Cast {
casts := make([]Cast, len(ids))

s.db.View(func(tx *bolt.Tx) error {
Expand Down
15 changes: 15 additions & 0 deletions api/store_bolt_casts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package api

import (
"testing"

"github.com/Castcloud/castcloud-go-server/Godeps/_workspace/src/github.com/stretchr/testify/assert"
)

func TestStoreGetCasts(t *testing.T) {
casts := store.GetCasts()
assert.NotNil(t, casts)
assert.True(t, len(casts) > 0)
assert.Equal(t, uint64(1), casts[0].ID)
assert.Equal(t, "test.go", casts[0].URL)
}
4 changes: 2 additions & 2 deletions assets/bindata.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (fi bindata_file_info) Sys() interface{} {
return nil
}

var _config_default_toml = "\x23\x20\x54\x68\x69\x73\x20\x69\x73\x20\x74\x68\x65\x20\x70\x6f\x72\x74\x20\x74\x68\x65\x20\x41\x50\x49\x20\x6c\x69\x73\x74\x65\x6e\x73\x20\x6f\x6e\x0a\x70\x6f\x72\x74\x20\x3d\x20\x33\x30\x30\x30\x0a\x0a\x5b\x63\x72\x61\x77\x6c\x5d\x0a\x23\x20\x4d\x61\x78\x69\x6d\x75\x6d\x20\x6e\x75\x6d\x62\x65\x72\x20\x6f\x66\x20\x63\x6f\x6e\x6e\x65\x63\x74\x69\x6f\x6e\x73\x20\x74\x6f\x20\x75\x73\x65\x20\x77\x68\x65\x6e\x20\x66\x65\x74\x63\x68\x69\x6e\x67\x20\x66\x65\x65\x64\x73\x0a\x6d\x61\x78\x5f\x63\x6f\x6e\x6e\x20\x3d\x20\x31\x32\x38"
var _config_default_toml = "\x70\x6f\x72\x74\x20\x3d\x20\x33\x30\x30\x30\x0a\x0a\x5b\x63\x72\x61\x77\x6c\x5d\x0a\x23\x20\x48\x6f\x77\x20\x6f\x66\x74\x65\x6e\x20\x66\x65\x65\x64\x73\x20\x73\x68\x6f\x75\x6c\x64\x20\x62\x65\x20\x66\x65\x74\x63\x68\x65\x64\x0a\x69\x6e\x74\x65\x72\x76\x61\x6c\x20\x3d\x20\x22\x31\x35\x6d\x22\x0a\x23\x20\x4d\x61\x78\x69\x6d\x75\x6d\x20\x6e\x75\x6d\x62\x65\x72\x20\x6f\x66\x20\x63\x6f\x6e\x6e\x65\x63\x74\x69\x6f\x6e\x73\x20\x74\x6f\x20\x75\x73\x65\x20\x77\x68\x65\x6e\x20\x66\x65\x74\x63\x68\x69\x6e\x67\x20\x66\x65\x65\x64\x73\x0a\x6d\x61\x78\x5f\x63\x6f\x6e\x6e\x20\x3d\x20\x31\x32\x38"

func config_default_toml_bytes() ([]byte, error) {
return bindata_read(
Expand All @@ -69,7 +69,7 @@ func config_default_toml() (*asset, error) {
return nil, err
}

info := bindata_file_info{name: "config.default.toml", size: 132, mode: os.FileMode(436), modTime: time.Unix(1434923886, 0)}
info := bindata_file_info{name: "config.default.toml", size: 147, mode: os.FileMode(436), modTime: time.Unix(1436139028, 0)}
a := &asset{bytes: bytes, info: info}
return a, nil
}
Expand Down
8 changes: 5 additions & 3 deletions cli/castcloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ func init() {
viper.ReadInConfig()

api.Configure(&api.Config{
Port: viper.GetInt("port"),
Debug: viper.GetBool("debug"),
Dir: dir,
Port: viper.GetInt("port"),
Debug: viper.GetBool("debug"),
Dir: dir,
CrawlInterval: viper.GetDuration("crawl.interval"),
MaxDownloadConnections: viper.GetInt("crawl.max_conn"),
})
})
Expand Down Expand Up @@ -72,6 +73,7 @@ func bindFlags() {
}

func setDefaults() {
viper.SetDefault("crawl.interval", "15m")
viper.SetDefault("crawl.max_conn", 128)
}

Expand Down
2 changes: 2 additions & 0 deletions config.default.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
port = 3000

[crawl]
# How often feeds should be fetched
interval = "15m"
# Maximum number of connections to use when fetching feeds
max_conn = 128

0 comments on commit 3c14c44

Please sign in to comment.