Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cron cleanup #2

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
ca2b467
Renaming JsonPostResponse.go -> JsonResponse.go
bign8 Sep 7, 2015
3a5f324
Begin spliting cron into multiple phases
bign8 Sep 8, 2015
c2f5766
Breaking crawler up into pieces
bign8 Sep 8, 2015
127a73b
Merge branch 'master' of https://github.com/bign8/chive-show into cro…
bign8 Oct 30, 2015
e70b329
Macking app work with new version of appengine builder
bign8 Nov 1, 2015
dd9dd11
Breaking up cron into (fetcher, dePager, parser, batcher, saver) [in …
bign8 Nov 1, 2015
87aa042
Cleaning up fetcher channel closures
bign8 Nov 1, 2015
a64b790
Finishing un-paginator
bign8 Nov 1, 2015
ad46e1c
Storing crawled posts
bign8 Nov 1, 2015
8dca8c3
Splitting up cron into multiple parts
bign8 Nov 10, 2015
8738e9a
Doing a bad idea (storing all the things super flat)
bign8 Nov 14, 2015
ac6a52f
Finding tags based on response data
bign8 Nov 15, 2015
a5ddd2a
Optimizing tags to take <15s
bign8 Nov 15, 2015
ea37ea5
Adding graph package (for serialized graphs)
bign8 Nov 15, 2015
4118439
Improving the /proj/tags endpoint performance
bign8 Nov 15, 2015
e61d9c4
Filling graph with proj endpoint (todo: shard and store)
bign8 Nov 15, 2015
a964f98
Adding datastore based sharder
bign8 Nov 16, 2015
3429d97
Pulling in go-routines
bign8 Nov 16, 2015
5781d36
Removing un-used master data
bign8 Nov 16, 2015
b4dec06
Removing un-needed writer complexity
bign8 Nov 16, 2015
2c19198
Attempting to add a unit test
bign8 Nov 16, 2015
c6ef61e
Storing the serialized graph
bign8 Nov 17, 2015
240c66a
Adding adjacency duplicate checks
bign8 Nov 17, 2015
6861dad
Fixing shards to not leave dead data in storage on write
bign8 Nov 19, 2015
b0398f9
Fixing flatten problems
bign8 Nov 19, 2015
348600f
Adding edge weights when duped
bign8 Nov 20, 2015
8ff6aff
Migrating Protobuffers to v3.0.0-beta-1
bign8 Nov 20, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions app/api/JsonPostResponse.go → app/api/JsonResponse.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (res *JSONResponse) write(w http.ResponseWriter) error {
} else {
out = string(items)
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
fmt.Fprint(w, out)
return err
}
53 changes: 53 additions & 0 deletions app/cron/chain/chain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package chain

import "sync"

// Worker is a function designed to fan out and perform work on a piece of Data
type Worker func(obj interface{}, out chan<- interface{}, idx int)

// FanOut allows lengthy workers to fan out on chanel operations
func FanOut(count int, buff int, in <-chan interface{}, doIt Worker) <-chan interface{} {
out := make(chan interface{}, buff)
var wg sync.WaitGroup
wg.Add(count)
for i := 0; i < count; i++ {
go func(idx int) {
for obj := range in {
doIt(obj, out, idx)
}
wg.Done()
}(i)
}
go func() {
wg.Wait()
close(out)
}()
return out
}

// FanIn takes multiple chanels and pushes their results into a single channel
func FanIn(buff int, cs ...<-chan interface{}) <-chan interface{} {
var wg sync.WaitGroup
out := make(chan interface{})

// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan interface{}) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}

// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
27 changes: 27 additions & 0 deletions app/cron/crawler/Batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package crawler

import "appengine"

// Batcher takes input and batches to given sizes
func Batcher(c appengine.Context, in <-chan interface{}, size int) <-chan []interface{} {
out := make(chan []interface{}, 10000)
go func() {
defer close(out)
batch := make([]interface{}, size)
count := 0
for post := range in {
batch[count] = post
count++
if count >= size {
count = 0
out <- batch
batch = make([]interface{}, size) // allocate another chunk of memory
}
}
c.Infof("Batcher: Finished Batching")
if count > 0 {
out <- batch[:count]
}
}()
return out
}
169 changes: 169 additions & 0 deletions app/cron/crawler/Fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package crawler

import (
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"

"appengine"
"appengine/urlfetch"
)

func pageURL(idx int) string {
return fmt.Sprintf("http://thechive.com/feed/?paged=%d", idx)
}

// Fetcher returns stream of un-processed xml posts
func Fetcher(c appengine.Context, workers int) <-chan interface{} {
res := make(chan interface{}, 100)
worker := &fetcher{
res: res,
context: c,
client: urlfetch.Client(c),
}
go worker.Main(workers)
return res
}

type fetcher struct {
res chan<- interface{}
context appengine.Context
client *http.Client
todo chan int
}

func (x *fetcher) Main(workers int) error {
defer close(x.res)

// Check first item edge case
if isStop, err := x.isStop(1); isStop || err != nil {
x.context.Infof("Fetcher: Finished without recursive searching %v", err)
return err
}

// Defer as many todo workers as necessary
x.todo = make(chan int, 1000)

// Number of batch fetchers to process
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
go func(idx int) {
x.processTODO()
wg.Done()
}(i)
}
wg.Add(workers)

err := x.Search(1, -1)

// wait for processTODOs to finish
wg.Wait()
x.context.Infof("Complete with FETCHING")
return err
}

func (x *fetcher) Search(bottom, top int) (err error) {
/*
def infinite_length(bottom=1, top=-1):
if bottom == 1 and not item_exists(1): return 0 # Starting edge case
if bottom == top - 1: return bottom # Result found! (top doesn’t exist)
if top < 0: # Searching forward
top = bottom << 1 # Base 2 hops
if item_exists(top):
top, bottom = -1, top # continue searching forward
else: # Binary search between bottom and top
middle = (bottom + top) // 2
bottom, top = middle, top if item_exists(middle) else bottom, middle
return infinite_length(bottom, top) # Tail recursion!!!
*/
if bottom == top-1 {
x.context.Infof("Fetcher: TOP OF RANGE FOUND! @%d", top)
x.addRange(bottom, top)
close(x.todo)
return nil
}
x.context.Infof("Fetcher: Search(%d, %d)", bottom, top)
var isStop = false

// Searching forward
if top < 0 {
top = bottom << 1 // Base 2 hops forward
isStop, err = x.isStop(top)
if err != nil {
return err
}
if !isStop {
x.addRange(bottom, top)
top, bottom = -1, top
}

// Binary search between top and bottom
} else {
middle := (bottom + top) / 2
isStop, err = x.isStop(middle)
if err != nil {
return err
}
if isStop {
top = middle
} else {
x.addRange(bottom, middle)
bottom = middle
}
}
return x.Search(bottom, top) // TAIL RECURSION!!!
}

func (x *fetcher) isStop(idx int) (isStop bool, err error) {

// Gather posts as necessary
url := pageURL(idx)
x.context.Infof("Fetcher: Fetching %s", url)
resp, err := x.client.Get(url)
if err != nil {
x.context.Errorf("Fetcher: Error decoding ChiveFeed (1s sleep): %s", err)
time.Sleep(time.Second)
return x.isStop(idx) // Tail recursion (this loop may get us into trouble)
}
defer resp.Body.Close()

// Check Response Codes for non-200 responses
if resp.StatusCode != 200 {
if resp.StatusCode == 404 {
x.context.Infof("Fetcher: Reached the end of the feed list (%v)", idx)
return true, nil
}
return true, fmt.Errorf("Fetcher: Feed parcing received a %d Status Code on (%s)", resp.StatusCode, url)
}

// Pull response content into String
contents, err := ioutil.ReadAll(resp.Body)
if err != nil {
return true, err
}
x.res <- Data{
KEY: url,
XML: string(contents),
}

// Use store_count info to determine if isStop
if DEBUG {
isStop = idx >= DEPTH
}
return isStop, nil
}

func (x *fetcher) addRange(bottom, top int) {
for i := bottom + 1; i < top; i++ {
x.todo <- i
}
}

func (x *fetcher) processTODO() {
for idx := range x.todo {
// x.context.Infof("Fetcher: NOT processing TODO %d", idx)
x.isStop(idx)
}
}
56 changes: 56 additions & 0 deletions app/cron/crawler/Storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package crawler

import (
"sync"

"appengine"
"appengine/datastore"
)

// Storage push items to datastore
func Storage(c appengine.Context, in <-chan []interface{}, workers int, loc string) {
var store func(c appengine.Context, in <-chan []interface{}, x int, loc string)

switch loc {
case XML:
store = runStorageData
}

var wg sync.WaitGroup
for i := 0; i < workers; i++ {
go func(x int) {
store(c, in, x, loc)
wg.Done()
}(i)
}
wg.Add(workers)
wg.Wait()
}

// Store single xml item to put in storage
type Store struct {
XML []byte
}

func runStorageData(c appengine.Context, in <-chan []interface{}, x int, loc string) {
var keys []*datastore.Key
var items []Store

for batch := range in {
c.Infof("Storage %d: Storing Post chunk", x)
keys = make([]*datastore.Key, len(batch))
items = make([]Store, len(batch))
for i, item := range batch {
x := item.(Data)
keys[i] = datastore.NewKey(c, loc, x.KEY, 0, nil)
items[i] = Store{[]byte(x.XML)}
}

// c.Infof("Storage: Storing %v", keys)
_, err := datastore.PutMulti(c, keys, items)
if err != nil {
c.Errorf("Storage %d: Error %s: %v %v", x, err, keys, items)
panic(err)
}
}
}
65 changes: 65 additions & 0 deletions app/cron/crawler/crawler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package crawler

import (
// "app/models"
// "app/helpers/keycache"

"appengine"
"appengine/datastore"
// "appengine/delay"
// "appengine/taskqueue"

"fmt"
"net/http"
)

const (
// DEBUG enable if troubleshooting algorithm
DEBUG = false

// DEPTH depth of feed mining
DEPTH = 1

// XML name of where xml posts pages are stored
XML = "xml"

// POST name of where xml posts are stored
POST = "post"
)

type Data struct {
KEY string
XML string
}

func Crawl(w http.ResponseWriter, r *http.Request) {
c := appengine.NewContext(r)

fetchers, storers := 50, 20

// fetcher, dePager, parser, batcher, saver
pages := Fetcher(c, fetchers)
// posts := UnPager(c, pages, pagers)
batch := Batcher(c, pages, 10)
Storage(c, batch, storers, XML)

fmt.Fprint(w, "Crawl Complete!")
}

func Stats(c appengine.Context, w http.ResponseWriter, r *http.Request) {

q := datastore.NewQuery("xml")

var data []Store
keys, err := q.GetAll(c, &data)
if err != nil {
fmt.Fprintf(w, "Error %s", err)
return
}

for idx, key := range keys {
fmt.Fprintf(w, "Data %s: len %d\n", key, len(data[idx].XML))
}

fmt.Fprintf(w, "Overall %d", len(data))
}