diff --git a/app/api/JsonPostResponse.go b/app/api/JsonResponse.go similarity index 93% rename from app/api/JsonPostResponse.go rename to app/api/JsonResponse.go index 90b9942..cd61b90 100644 --- a/app/api/JsonPostResponse.go +++ b/app/api/JsonResponse.go @@ -40,6 +40,7 @@ func (res *JSONResponse) write(w http.ResponseWriter) error { } else { out = string(items) } + w.Header().Set("Content-Type", "application/json; charset=utf-8") fmt.Fprint(w, out) return err } diff --git a/app/cron/chain/chain.go b/app/cron/chain/chain.go new file mode 100644 index 0000000..35e2952 --- /dev/null +++ b/app/cron/chain/chain.go @@ -0,0 +1,53 @@ +package chain + +import "sync" + +// Worker is a function designed to fan out and perform work on a piece of Data +type Worker func(obj interface{}, out chan<- interface{}, idx int) + +// FanOut allows lengthy workers to fan out on chanel operations +func FanOut(count int, buff int, in <-chan interface{}, doIt Worker) <-chan interface{} { + out := make(chan interface{}, buff) + var wg sync.WaitGroup + wg.Add(count) + for i := 0; i < count; i++ { + go func(idx int) { + for obj := range in { + doIt(obj, out, idx) + } + wg.Done() + }(i) + } + go func() { + wg.Wait() + close(out) + }() + return out +} + +// FanIn takes multiple chanels and pushes their results into a single channel +func FanIn(buff int, cs ...<-chan interface{}) <-chan interface{} { + var wg sync.WaitGroup + out := make(chan interface{}) + + // Start an output goroutine for each input channel in cs. output + // copies values from c to out until c is closed, then calls wg.Done. + output := func(c <-chan interface{}) { + for n := range c { + out <- n + } + wg.Done() + } + wg.Add(len(cs)) + for _, c := range cs { + go output(c) + } + + // Start a goroutine to close out once all the output goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() + close(out) + }() + return out +} diff --git a/app/cron/crawler/Batcher.go b/app/cron/crawler/Batcher.go new file mode 100644 index 0000000..720b28e --- /dev/null +++ b/app/cron/crawler/Batcher.go @@ -0,0 +1,27 @@ +package crawler + +import "appengine" + +// Batcher takes input and batches to given sizes +func Batcher(c appengine.Context, in <-chan interface{}, size int) <-chan []interface{} { + out := make(chan []interface{}, 10000) + go func() { + defer close(out) + batch := make([]interface{}, size) + count := 0 + for post := range in { + batch[count] = post + count++ + if count >= size { + count = 0 + out <- batch + batch = make([]interface{}, size) // allocate another chunk of memory + } + } + c.Infof("Batcher: Finished Batching") + if count > 0 { + out <- batch[:count] + } + }() + return out +} diff --git a/app/cron/crawler/Fetcher.go b/app/cron/crawler/Fetcher.go new file mode 100644 index 0000000..9afd088 --- /dev/null +++ b/app/cron/crawler/Fetcher.go @@ -0,0 +1,169 @@ +package crawler + +import ( + "fmt" + "io/ioutil" + "net/http" + "sync" + "time" + + "appengine" + "appengine/urlfetch" +) + +func pageURL(idx int) string { + return fmt.Sprintf("http://thechive.com/feed/?paged=%d", idx) +} + +// Fetcher returns stream of un-processed xml posts +func Fetcher(c appengine.Context, workers int) <-chan interface{} { + res := make(chan interface{}, 100) + worker := &fetcher{ + res: res, + context: c, + client: urlfetch.Client(c), + } + go worker.Main(workers) + return res +} + +type fetcher struct { + res chan<- interface{} + context appengine.Context + client *http.Client + todo chan int +} + +func (x *fetcher) Main(workers int) error { + defer close(x.res) + + // Check first item edge case + if isStop, err := x.isStop(1); isStop || err != nil { + x.context.Infof("Fetcher: Finished without recursive searching %v", err) + return err + } + + // Defer as many todo workers as necessary + x.todo = make(chan int, 1000) + + // Number of batch fetchers to process + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + go func(idx int) { + x.processTODO() + wg.Done() + }(i) + } + wg.Add(workers) + + err := x.Search(1, -1) + + // wait for processTODOs to finish + wg.Wait() + x.context.Infof("Complete with FETCHING") + return err +} + +func (x *fetcher) Search(bottom, top int) (err error) { + /* + def infinite_length(bottom=1, top=-1): + if bottom == 1 and not item_exists(1): return 0 # Starting edge case + if bottom == top - 1: return bottom # Result found! (top doesn’t exist) + if top < 0: # Searching forward + top = bottom << 1 # Base 2 hops + if item_exists(top): + top, bottom = -1, top # continue searching forward + else: # Binary search between bottom and top + middle = (bottom + top) // 2 + bottom, top = middle, top if item_exists(middle) else bottom, middle + return infinite_length(bottom, top) # Tail recursion!!! + */ + if bottom == top-1 { + x.context.Infof("Fetcher: TOP OF RANGE FOUND! @%d", top) + x.addRange(bottom, top) + close(x.todo) + return nil + } + x.context.Infof("Fetcher: Search(%d, %d)", bottom, top) + var isStop = false + + // Searching forward + if top < 0 { + top = bottom << 1 // Base 2 hops forward + isStop, err = x.isStop(top) + if err != nil { + return err + } + if !isStop { + x.addRange(bottom, top) + top, bottom = -1, top + } + + // Binary search between top and bottom + } else { + middle := (bottom + top) / 2 + isStop, err = x.isStop(middle) + if err != nil { + return err + } + if isStop { + top = middle + } else { + x.addRange(bottom, middle) + bottom = middle + } + } + return x.Search(bottom, top) // TAIL RECURSION!!! +} + +func (x *fetcher) isStop(idx int) (isStop bool, err error) { + + // Gather posts as necessary + url := pageURL(idx) + x.context.Infof("Fetcher: Fetching %s", url) + resp, err := x.client.Get(url) + if err != nil { + x.context.Errorf("Fetcher: Error decoding ChiveFeed (1s sleep): %s", err) + time.Sleep(time.Second) + return x.isStop(idx) // Tail recursion (this loop may get us into trouble) + } + defer resp.Body.Close() + + // Check Response Codes for non-200 responses + if resp.StatusCode != 200 { + if resp.StatusCode == 404 { + x.context.Infof("Fetcher: Reached the end of the feed list (%v)", idx) + return true, nil + } + return true, fmt.Errorf("Fetcher: Feed parcing received a %d Status Code on (%s)", resp.StatusCode, url) + } + + // Pull response content into String + contents, err := ioutil.ReadAll(resp.Body) + if err != nil { + return true, err + } + x.res <- Data{ + KEY: url, + XML: string(contents), + } + + // Use store_count info to determine if isStop + if DEBUG { + isStop = idx >= DEPTH + } + return isStop, nil +} + +func (x *fetcher) addRange(bottom, top int) { + for i := bottom + 1; i < top; i++ { + x.todo <- i + } +} + +func (x *fetcher) processTODO() { + for idx := range x.todo { + // x.context.Infof("Fetcher: NOT processing TODO %d", idx) + x.isStop(idx) + } +} diff --git a/app/cron/crawler/Storage.go b/app/cron/crawler/Storage.go new file mode 100644 index 0000000..a527a16 --- /dev/null +++ b/app/cron/crawler/Storage.go @@ -0,0 +1,56 @@ +package crawler + +import ( + "sync" + + "appengine" + "appengine/datastore" +) + +// Storage push items to datastore +func Storage(c appengine.Context, in <-chan []interface{}, workers int, loc string) { + var store func(c appengine.Context, in <-chan []interface{}, x int, loc string) + + switch loc { + case XML: + store = runStorageData + } + + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + go func(x int) { + store(c, in, x, loc) + wg.Done() + }(i) + } + wg.Add(workers) + wg.Wait() +} + +// Store single xml item to put in storage +type Store struct { + XML []byte +} + +func runStorageData(c appengine.Context, in <-chan []interface{}, x int, loc string) { + var keys []*datastore.Key + var items []Store + + for batch := range in { + c.Infof("Storage %d: Storing Post chunk", x) + keys = make([]*datastore.Key, len(batch)) + items = make([]Store, len(batch)) + for i, item := range batch { + x := item.(Data) + keys[i] = datastore.NewKey(c, loc, x.KEY, 0, nil) + items[i] = Store{[]byte(x.XML)} + } + + // c.Infof("Storage: Storing %v", keys) + _, err := datastore.PutMulti(c, keys, items) + if err != nil { + c.Errorf("Storage %d: Error %s: %v %v", x, err, keys, items) + panic(err) + } + } +} diff --git a/app/cron/crawler/crawler.go b/app/cron/crawler/crawler.go new file mode 100644 index 0000000..ca203ab --- /dev/null +++ b/app/cron/crawler/crawler.go @@ -0,0 +1,65 @@ +package crawler + +import ( + // "app/models" + // "app/helpers/keycache" + + "appengine" + "appengine/datastore" + // "appengine/delay" + // "appengine/taskqueue" + + "fmt" + "net/http" +) + +const ( + // DEBUG enable if troubleshooting algorithm + DEBUG = false + + // DEPTH depth of feed mining + DEPTH = 1 + + // XML name of where xml posts pages are stored + XML = "xml" + + // POST name of where xml posts are stored + POST = "post" +) + +type Data struct { + KEY string + XML string +} + +func Crawl(w http.ResponseWriter, r *http.Request) { + c := appengine.NewContext(r) + + fetchers, storers := 50, 20 + + // fetcher, dePager, parser, batcher, saver + pages := Fetcher(c, fetchers) + // posts := UnPager(c, pages, pagers) + batch := Batcher(c, pages, 10) + Storage(c, batch, storers, XML) + + fmt.Fprint(w, "Crawl Complete!") +} + +func Stats(c appengine.Context, w http.ResponseWriter, r *http.Request) { + + q := datastore.NewQuery("xml") + + var data []Store + keys, err := q.GetAll(c, &data) + if err != nil { + fmt.Fprintf(w, "Error %s", err) + return + } + + for idx, key := range keys { + fmt.Fprintf(w, "Data %s: len %d\n", key, len(data[idx].XML)) + } + + fmt.Fprintf(w, "Overall %d", len(data)) +} diff --git a/app/cron/cron.go b/app/cron/cron.go index d1a40ea..068d867 100644 --- a/app/cron/cron.go +++ b/app/cron/cron.go @@ -1,399 +1,432 @@ package cron import ( - "encoding/xml" "fmt" "net/http" - "net/url" - "regexp" - "strconv" - "github.com/bign8/chive-show/app/helpers/keycache" - "github.com/bign8/chive-show/app/models" + "github.com/bign8/chive-show/app/cron/crawler" + "github.com/bign8/chive-show/app/cron/proj" + "gopkg.in/mjibson/v1/appstats" "appengine" "appengine/datastore" - "appengine/delay" - "appengine/taskqueue" - "appengine/urlfetch" ) -const ( - // SIZE of a batch - SIZE = 10 - - // DEBUG enable if troubleshooting algorithm - DEBUG = true - - // DEPTH depth of feed mining - DEPTH = 1 - - // DEFERRED if deferreds should be processed deferred - DEFERRED = true -) - -// Init initializes cron handlers -func Init() { - http.Handle("/cron/parse", appstats.NewHandler(parseFeeds)) - http.HandleFunc("/cron/delete", delete) -} - -var ( - // ErrFeedParse404 if feed page is not found - ErrFeedParse404 = fmt.Errorf("Feed parcing recieved a %d Status Code", 404) -) - -func pageURL(idx int) string { - return fmt.Sprintf("http://thechive.com/feed/?paged=%d", idx) -} - -func parseFeeds(c appengine.Context, w http.ResponseWriter, r *http.Request) { - fp := new(feedParser) - err := fp.Main(c, w) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } else { - fmt.Fprint(w, "Parsed") - } -} - -type feedParser struct { - context appengine.Context - client *http.Client - - todo []int - guids map[int64]bool // this could be extremely large - posts []models.Post -} - -func (x *feedParser) Main(c appengine.Context, w http.ResponseWriter) error { - x.context = c - x.client = urlfetch.Client(c) - - // Load guids from DB - // TODO: do this with sharded keys - keys, err := datastore.NewQuery(models.POST).KeysOnly().GetAll(c, nil) - if err != nil { - c.Errorf("Error finding keys %v %v", err, appengine.IsOverQuota(err)) - return err - } - x.guids = map[int64]bool{} - for _, key := range keys { - x.guids[key.IntID()] = true - } - keys = nil - - // // DEBUG ONLY - // data, err := json.MarshalIndent(x.guids, "", " ") - // fmt.Fprint(w, string(data)) - // return err - x.posts = make([]models.Post, 0) - - // Initial recursive edge case - isStop, fullStop, err := x.isStop(1) - if isStop || fullStop || err != nil { - c.Infof("Finished without recursive searching %v", err) - if err == nil { - err = x.storePosts(x.posts) - } - return err - } - - // Recursive search strategy - err = x.Search(1, -1) - - // storePosts and processTodo - if err == nil { - errc := make(chan error) - go func() { - errc <- x.storePosts(x.posts) - }() - go func() { - errc <- x.processTodo() - }() - err1, err2 := <-errc, <-errc - if err1 != nil { - err = err1 - } else if err2 != nil { - err = err2 - } - } - - if err != nil { - c.Errorf("Error in Main %v", err) - } - return err -} - -var processBatchDeferred = delay.Func("process-todo-batch", func(c appengine.Context, ids []int) { - parser := feedParser{ - context: c, - client: urlfetch.Client(c), - } - parser.processBatch(ids) -}) - -func (x *feedParser) processBatch(ids []int) error { - done := make(chan error) - for _, idx := range ids { - go func(idx int) { - posts, err := x.getAndParseFeed(idx) - if err == nil { - err = x.storePosts(posts) - } - done <- err - }(idx) - } - for i := 0; i < len(ids); i++ { - err := <-done - if err != nil { - x.context.Errorf("error storing feed (at index %d): %v", i, err) - return err - } - } - return nil -} - -func (x *feedParser) processTodo() error { - x.context.Infof("Processing TODO: %v", x.todo) - - var batch []int - var task *taskqueue.Task - var allTasks []*taskqueue.Task - var err error - for _, idx := range x.todo { - if batch == nil { - batch = make([]int, 0) - } - batch = append(batch, idx) - if len(batch) >= SIZE { - if DEFERRED { - task, err = processBatchDeferred.Task(batch) - if err == nil { - allTasks = append(allTasks, task) - } - } else { - err = x.processBatch(batch) - } - if err != nil { - return err - } - batch = nil +// const ( +// // SIZE of a batch +// SIZE = 10 +// +// // DEBUG enable if troubleshooting algorithm +// DEBUG = true +// +// // DEPTH depth of feed mining +// DEPTH = 1 +// +// // DEFERRED if deferreds should be processed deferred +// DEFERRED = true +// ) + +func cleanup(c appengine.Context, name string) error { + c.Infof("Cleaning %s", name) + q := datastore.NewQuery(name).KeysOnly() + keys, err := q.GetAll(c, nil) + s := 100 + for len(keys) > 0 { + if len(keys) < 100 { + s = len(keys) } - } - if len(batch) > 0 { - if DEFERRED { - task, err = processBatchDeferred.Task(batch) - if err == nil { - allTasks = append(allTasks, task) - } - } else { - err = x.processBatch(batch) - } - } - if DEFERRED && len(allTasks) > 0 { - x.context.Infof("Adding %d task(s) to the default queue", len(allTasks)) - taskqueue.AddMulti(x.context, allTasks, "default") + err = datastore.DeleteMulti(c, keys[:s]) + keys = keys[s:] } return err } -func (x *feedParser) addRange(bottom, top int) { - for i := bottom + 1; i < top; i++ { - x.todo = append(x.todo, i) - } -} - -func (x *feedParser) Search(bottom, top int) (err error) { - /* - def infinite_length(bottom=1, top=-1): - if bottom == 1 and not item_exists(1): return 0 # Starting edge case - if bottom == top - 1: return bottom # Result found! (top doesn’t exist) - if top < 0: # Searching forward - top = bottom << 1 # Base 2 hops - if item_exists(top): - top, bottom = -1, top # continue searching forward - else: # Binary search between bottom and top - middle = (bottom + top) // 2 - bottom, top = middle, top if item_exists(middle) else bottom, middle - return infinite_length(bottom, top) # Tail recursion!!! - */ - if bottom == top-1 { - x.context.Infof("TOP OF RANGE FOUND! @%d", top) - x.addRange(bottom, top) - return nil - } - var fullStop, isStop bool = false, false - if top < 0 { // Searching forward - top = bottom << 1 // Base 2 hops forward - isStop, fullStop, err = x.isStop(top) - if err != nil { - return err - } - if !isStop { - x.addRange(bottom, top) - top, bottom = -1, top - } - } else { // Binary search between top and bottom - middle := (bottom + top) / 2 - isStop, fullStop, err = x.isStop(middle) - if err != nil { - return err - } - if isStop { - top = middle - } else { - x.addRange(bottom, middle) - bottom = middle - } - } - if fullStop { - return nil - } - return x.Search(bottom, top) // TAIL RECURSION!!! -} - -func (x *feedParser) isStop(idx int) (isStop, fullStop bool, err error) { - // Gather posts as necessary - posts, err := x.getAndParseFeed(idx) - if err == ErrFeedParse404 { - x.context.Infof("Reached the end of the feed list (%v)", idx) - return true, false, nil - } - if err != nil { - x.context.Errorf("Error decoding ChiveFeed: %s", err) - return false, false, err - } - - // Check for Duplicates - count := 0 - for _, post := range posts { - id, _, err := guidToInt(post.GUID) - if x.guids[id] || err != nil { - continue - } - count++ - } - x.posts = append(x.posts, posts...) - - // Use store_count info to determine if isStop - isStop = count == 0 || DEBUG - fullStop = len(posts) != count && count > 0 - if DEBUG { - isStop = idx > DEPTH - fullStop = idx == DEPTH - } - return -} - -func (x *feedParser) getAndParseFeed(idx int) ([]models.Post, error) { - url := pageURL(idx) - - // Get Response - x.context.Infof("Parsing index %v (%v)", idx, url) - resp, err := x.client.Get(url) - if err != nil { - return nil, err - } - defer resp.Body.Close() - if resp.StatusCode != 200 { - if resp.StatusCode == 404 { - return nil, ErrFeedParse404 - } - return nil, fmt.Errorf("Feed parcing recieved a %d Status Code", resp.StatusCode) - } - - // Decode Response - decoder := xml.NewDecoder(resp.Body) - var feed struct { - Items []models.Post `xml:"channel>item"` - } - if decoder.Decode(&feed) != nil { - return nil, err - } - - // Cleanup Response - for idx := range feed.Items { - post := &feed.Items[idx] - for i, img := range post.Media { - post.Media[i].URL = stripQuery(img.URL) - } - post.MugShot = post.Media[0].URL - post.Media = post.Media[1:] - } - return feed.Items, err -} - -func (x *feedParser) storePosts(dirty []models.Post) (err error) { - var posts []models.Post - var keys []*datastore.Key - for _, post := range dirty { - key, err := x.cleanPost(&post) - if err != nil { - continue - } - posts = append(posts, post) - keys = append(keys, key) - } - if len(keys) > 0 { - complete, err := datastore.PutMulti(x.context, keys, posts) - if err == nil { - err = keycache.AddKeys(x.context, models.POST, complete) - } - } - return err -} - -func (x *feedParser) cleanPost(p *models.Post) (*datastore.Key, error) { - id, link, err := guidToInt(p.GUID) - if err != nil { - return nil, err - } - // Remove link posts - if link { - x.context.Infof("Ignoring links post %v \"%v\"", p.GUID, p.Title) - return nil, fmt.Errorf("Ignoring links post") - } - - // Detect video only posts - video := regexp.MustCompile("\\([^&]*Video.*\\)") - if video.MatchString(p.Title) { - x.context.Infof("Ignoring video post %v \"%v\"", p.GUID, p.Title) - return nil, fmt.Errorf("Ignoring video post") - } - x.context.Infof("Storing post %v \"%v\"", p.GUID, p.Title) - - // Cleanup post titles - clean := regexp.MustCompile("\\W\\(([^\\)]*)\\)$") - p.Title = clean.ReplaceAllLiteralString(p.Title, "") - - // Post - // temp_key := datastore.NewIncompleteKey(x.context, DB_POST_TABLE, nil) - key := datastore.NewKey(x.context, models.POST, "", id, nil) - return key, nil -} - -func guidToInt(guid string) (int64, bool, error) { - // Remove link posts - url, err := url.Parse(guid) - if err != nil { - return -1, false, err - } - - // Parsing post id from guid url - id, err := strconv.Atoi(url.Query().Get("p")) - if err != nil { - return -1, false, err - } - return int64(id), url.Query().Get("post_type") == "sdac_links", nil +// Init initializes cron handlers +func Init() { + http.HandleFunc("/cron/stage/1", crawler.Crawl) + + http.Handle("/proj/tags", appstats.NewHandler(proj.Tags)) + http.Handle("/proj/graph", appstats.NewHandler(proj.Graph)) + http.Handle("/proj/shard", appstats.NewHandler(proj.TestShard)) + + http.HandleFunc("/cron/clean", func(w http.ResponseWriter, r *http.Request) { + c := appengine.NewContext(r) + cleanup(c, "buff") + cleanup(c, "edge") + cleanup(c, "vertex") + cleanup(c, "post") + cleanup(c, "shard-pieces") + cleanup(c, "shard-master") + }) + + http.Handle("/cron/stats", appstats.NewHandler(crawler.Stats)) + + // http.Handle("/cron/parse", appstats.NewHandler(parseFeeds)) + // http.HandleFunc("/cron/delete", delete) + http.HandleFunc("/_ah/start", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "Start") + }) + http.HandleFunc("/_ah/stop", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "Stop") + }) } -func stripQuery(dirty string) string { - obj, err := url.Parse(dirty) - if err != nil { - return dirty - } - obj.RawQuery = "" - return obj.String() -} +// var ( +// // ErrFeedParse404 if feed page is not found +// ErrFeedParse404 = fmt.Errorf("Feed parcing recieved a %d Status Code", 404) +// ) +// +// func pageURL(idx int) string { +// return fmt.Sprintf("http://thechive.com/feed/?paged=%d", idx) +// } +// +// func parseFeeds(c appengine.Context, w http.ResponseWriter, r *http.Request) { +// fp := new(feedParser) +// err := fp.Main(c, w) +// if err != nil { +// http.Error(w, err.Error(), http.StatusInternalServerError) +// } else { +// fmt.Fprint(w, "Parsed") +// } +// } +// +// type feedParser struct { +// context appengine.Context +// client *http.Client +// +// todo []int +// guids map[int64]bool // this could be extremely large +// posts []models.Post +// } +// +// func (x *feedParser) Main(c appengine.Context, w http.ResponseWriter) error { +// x.context = c +// x.client = urlfetch.Client(c) +// +// // Load guids from DB +// // TODO: do this with sharded keys +// keys, err := datastore.NewQuery(models.POST).KeysOnly().GetAll(c, nil) +// if err != nil { +// c.Errorf("Error finding keys %v %v", err, appengine.IsOverQuota(err)) +// return err +// } +// x.guids = map[int64]bool{} +// for _, key := range keys { +// x.guids[key.IntID()] = true +// } +// keys = nil +// +// // // DEBUG ONLY +// // data, err := json.MarshalIndent(x.guids, "", " ") +// // fmt.Fprint(w, string(data)) +// // return err +// x.posts = make([]models.Post, 0) +// +// // Initial recursive edge case +// isStop, fullStop, err := x.isStop(1) +// if isStop || fullStop || err != nil { +// c.Infof("Finished without recursive searching %v", err) +// if err == nil { +// err = x.storePosts(x.posts) +// } +// return err +// } +// +// // Recursive search strategy +// err = x.Search(1, -1) +// +// // storePosts and processTodo +// if err == nil { +// errc := make(chan error) +// go func() { +// errc <- x.storePosts(x.posts) +// }() +// go func() { +// errc <- x.processTodo() +// }() +// err1, err2 := <-errc, <-errc +// if err1 != nil { +// err = err1 +// } else if err2 != nil { +// err = err2 +// } +// } +// +// if err != nil { +// c.Errorf("Error in Main %v", err) +// } +// return err +// } +// +// var processBatchDeferred = delay.Func("process-todo-batch", func(c appengine.Context, ids []int) { +// parser := feedParser{ +// context: c, +// client: urlfetch.Client(c), +// } +// parser.processBatch(ids) +// }) +// +// func (x *feedParser) processBatch(ids []int) error { +// done := make(chan error) +// for _, idx := range ids { +// go func(idx int) { +// posts, err := x.getAndParseFeed(idx) +// if err == nil { +// err = x.storePosts(posts) +// } +// done <- err +// }(idx) +// } +// for i := 0; i < len(ids); i++ { +// err := <-done +// if err != nil { +// x.context.Errorf("error storing feed (at index %d): %v", i, err) +// return err +// } +// } +// return nil +// } +// +// func (x *feedParser) processTodo() error { +// x.context.Infof("Processing TODO: %v", x.todo) +// +// var batch []int +// var task *taskqueue.Task +// var allTasks []*taskqueue.Task +// var err error +// for _, idx := range x.todo { +// if batch == nil { +// batch = make([]int, 0) +// } +// batch = append(batch, idx) +// if len(batch) >= SIZE { +// if DEFERRED { +// task, err = processBatchDeferred.Task(batch) +// if err == nil { +// allTasks = append(allTasks, task) +// } +// } else { +// err = x.processBatch(batch) +// } +// if err != nil { +// return err +// } +// batch = nil +// } +// } +// if len(batch) > 0 { +// if DEFERRED { +// task, err = processBatchDeferred.Task(batch) +// if err == nil { +// allTasks = append(allTasks, task) +// } +// } else { +// err = x.processBatch(batch) +// } +// } +// if DEFERRED && len(allTasks) > 0 { +// x.context.Infof("Adding %d task(s) to the default queue", len(allTasks)) +// taskqueue.AddMulti(x.context, allTasks, "default") +// } +// return err +// } +// +// func (x *feedParser) addRange(bottom, top int) { +// for i := bottom + 1; i < top; i++ { +// x.todo = append(x.todo, i) +// } +// } +// +// func (x *feedParser) Search(bottom, top int) (err error) { +// /* +// def infinite_length(bottom=1, top=-1): +// if bottom == 1 and not item_exists(1): return 0 # Starting edge case +// if bottom == top - 1: return bottom # Result found! (top doesn’t exist) +// if top < 0: # Searching forward +// top = bottom << 1 # Base 2 hops +// if item_exists(top): +// top, bottom = -1, top # continue searching forward +// else: # Binary search between bottom and top +// middle = (bottom + top) // 2 +// bottom, top = middle, top if item_exists(middle) else bottom, middle +// return infinite_length(bottom, top) # Tail recursion!!! +// */ +// if bottom == top-1 { +// x.context.Infof("TOP OF RANGE FOUND! @%d", top) +// x.addRange(bottom, top) +// return nil +// } +// var fullStop, isStop bool = false, false +// if top < 0 { // Searching forward +// top = bottom << 1 // Base 2 hops forward +// isStop, fullStop, err = x.isStop(top) +// if err != nil { +// return err +// } +// if !isStop { +// x.addRange(bottom, top) +// top, bottom = -1, top +// } +// } else { // Binary search between top and bottom +// middle := (bottom + top) / 2 +// isStop, fullStop, err = x.isStop(middle) +// if err != nil { +// return err +// } +// if isStop { +// top = middle +// } else { +// x.addRange(bottom, middle) +// bottom = middle +// } +// } +// if fullStop { +// return nil +// } +// return x.Search(bottom, top) // TAIL RECURSION!!! +// } +// +// func (x *feedParser) isStop(idx int) (isStop, fullStop bool, err error) { +// // Gather posts as necessary +// posts, err := x.getAndParseFeed(idx) +// if err == ErrFeedParse404 { +// x.context.Infof("Reached the end of the feed list (%v)", idx) +// return true, false, nil +// } +// if err != nil { +// x.context.Errorf("Error decoding ChiveFeed: %s", err) +// return false, false, err +// } +// +// // Check for Duplicates +// count := 0 +// for _, post := range posts { +// id, _, err := guidToInt(post.GUID) +// if x.guids[id] || err != nil { +// continue +// } +// count++ +// } +// x.posts = append(x.posts, posts...) +// +// // Use store_count info to determine if isStop +// isStop = count == 0 || DEBUG +// fullStop = len(posts) != count && count > 0 +// if DEBUG { +// isStop = idx > DEPTH +// fullStop = idx == DEPTH +// } +// return +// } +// +// func (x *feedParser) getAndParseFeed(idx int) ([]models.Post, error) { +// url := pageURL(idx) +// +// // Get Response +// x.context.Infof("Parsing index %v (%v)", idx, url) +// resp, err := x.client.Get(url) +// if err != nil { +// return nil, err +// } +// defer resp.Body.Close() +// if resp.StatusCode != 200 { +// if resp.StatusCode == 404 { +// return nil, ErrFeedParse404 +// } +// return nil, fmt.Errorf("Feed parcing recieved a %d Status Code", resp.StatusCode) +// } +// +// // Decode Response +// decoder := xml.NewDecoder(resp.Body) +// var feed struct { +// Items []models.Post `xml:"channel>item"` +// } +// if decoder.Decode(&feed) != nil { +// return nil, err +// } +// +// // Cleanup Response +// for idx := range feed.Items { +// post := &feed.Items[idx] +// for i, img := range post.Media { +// post.Media[i].URL = stripQuery(img.URL) +// } +// post.MugShot = post.Media[0].URL +// post.Media = post.Media[1:] +// } +// return feed.Items, err +// } +// +// func (x *feedParser) storePosts(dirty []models.Post) (err error) { +// var posts []models.Post +// var keys []*datastore.Key +// for _, post := range dirty { +// key, err := x.cleanPost(&post) +// if err != nil { +// continue +// } +// posts = append(posts, post) +// keys = append(keys, key) +// } +// if len(keys) > 0 { +// complete, err := datastore.PutMulti(x.context, keys, posts) +// if err == nil { +// err = keycache.AddKeys(x.context, models.POST, complete) +// } +// } +// return err +// } +// +// func (x *feedParser) cleanPost(p *models.Post) (*datastore.Key, error) { +// id, link, err := guidToInt(p.GUID) +// if err != nil { +// return nil, err +// } +// // Remove link posts +// if link { +// x.context.Infof("Ignoring links post %v \"%v\"", p.GUID, p.Title) +// return nil, fmt.Errorf("Ignoring links post") +// } +// +// // Detect video only posts +// video := regexp.MustCompile("\\([^&]*Video.*\\)") +// if video.MatchString(p.Title) { +// x.context.Infof("Ignoring video post %v \"%v\"", p.GUID, p.Title) +// return nil, fmt.Errorf("Ignoring video post") +// } +// x.context.Infof("Storing post %v \"%v\"", p.GUID, p.Title) +// +// // Cleanup post titles +// clean := regexp.MustCompile("\\W\\(([^\\)]*)\\)$") +// p.Title = clean.ReplaceAllLiteralString(p.Title, "") +// +// // Post +// // temp_key := datastore.NewIncompleteKey(x.context, DB_POST_TABLE, nil) +// key := datastore.NewKey(x.context, models.POST, "", id, nil) +// return key, nil +// } +// +// func guidToInt(guid string) (int64, bool, error) { +// // Remove link posts +// url, err := url.Parse(guid) +// if err != nil { +// return -1, false, err +// } +// +// // Parsing post id from guid url +// id, err := strconv.Atoi(url.Query().Get("p")) +// if err != nil { +// return -1, false, err +// } +// return int64(id), url.Query().Get("post_type") == "sdac_links", nil +// } +// +// func stripQuery(dirty string) string { +// obj, err := url.Parse(dirty) +// if err != nil { +// return dirty +// } +// obj.RawQuery = "" +// return obj.String() +// } diff --git a/app/cron/parser.go b/app/cron/parser.go new file mode 100644 index 0000000..e869203 --- /dev/null +++ b/app/cron/parser.go @@ -0,0 +1,63 @@ +package cron + +import ( + // "app/models" + // "app/helpers/keycache" + // "appengine" + // "appengine/datastore" + // "appengine/delay" + // "appengine/taskqueue" + // "appengine/urlfetch" + "encoding/xml" + // "encoding/json" + // "fmt" + // "net/http" + "html/template" +) + +type Node struct { + // XML string `xml:",innerxml"` + // ATTR []string + // DATA string `xml:",chardata"` + XMLName xml.Name + XMLAttrs []xml.Attr `xml:",any"` + DATA string `xml:",chardata"` +} + +type Post struct { + Guid string `xml:"guid"` + Tags []string `xml:"category"` + Link string `xml:"link"` + Date string `xml:"pubDate"` + Title string `xml:"title"` + Creator string `xml:"creator"` + Media []Img `xml:"content"` + CommentRSS string `xml:"commentRss"` + Comment []string `xml:"comments"` + Desc template.HTML `xml:"description"` + Enclosure struct { + Url string `xml:"url,attr"` + Children []Node `xml:",any"` + } `xml:"enclosure"` + Thumbnail struct { + Url string `xml:"url,attr"` + Children []Node `xml:",any"` + } `xml:"thumbnail"` + Children []Node `xml:",any"` + Content template.HTML `xml:"encoded"` +} + +type Img struct { + Url string `xml:"url,attr"` + Title string `xml:"title"` + Rating string `xml:"rating"` + Category string `xml:"category"` +} + +// Worker: this will be a worker on defered work chains + +func parseData(data string) (*Post, error) { + var post Post + err := xml.Unmarshal([]byte(data), &post) + return &post, err +} diff --git a/app/cron/proj/graph.go b/app/cron/proj/graph.go new file mode 100644 index 0000000..ab7e7aa --- /dev/null +++ b/app/cron/proj/graph.go @@ -0,0 +1,127 @@ +package proj + +import ( + "bytes" + "net/http" + "strings" + "time" + + "appengine" + + "github.com/bign8/chive-show/app/cron/proj/graph" + "github.com/bign8/chive-show/app/helpers/sharder" +) + +// TestShard to delete +func TestShard(c appengine.Context, w http.ResponseWriter, r *http.Request) { + + data := []byte(strings.Repeat("01234567890123456789", 1e6)) + + // Writing + start := time.Now() + err := sharder.Writer(c, "test", data) + if err != nil { + c.Errorf("Writer Error: %s", err) + return + } + c.Infof("Write took: %v", time.Since(start)) + + // Reading + start = time.Now() + read, err := sharder.Reader(c, "test") + if err != nil { + c.Errorf("Reader Error: %s", err) + return + } + c.Infof("Data Length: %d; isSame: %v", len(read), bytes.Equal(read, data)) + c.Infof("Read took: %v", time.Since(start)) +} + +// Graph processes all posts in attempt to create a graph +func Graph(c appengine.Context, w http.ResponseWriter, r *http.Request) { + start := time.Now() + + var item Item + var post, ntag, nimg graph.NodeID + + idx := 0 + timeout := time.After(time.Second) + g := graph.New(false) + for idk := range getItems(c) { + item = idk.(Item) + post = g.Add(item.GUID, graph.NodeType_POST, 0) + + for _, tag := range validTags(item.Tags) { + ntag = g.Add(tag, graph.NodeType_TAG, 1) + g.Connect(post, ntag, 1) + } + + for _, img := range item.Imgs { + nimg = g.Add(img, graph.NodeType_IMG, 1) + g.Connect(post, nimg, 1) + } + + // This is a DEBUG only operation + select { + case <-timeout: + c.Infof("Index: %d; Duration: %v", idx, time.Since(start)) + timeout = time.After(time.Second) + default: + } + idx++ + } + c.Infof("End Loop: %d; Duration: %v", idx, time.Since(start)) + + // Write result + bits, err := g.Bytes() + if err != nil { + c.Errorf("Error in Graph.Bytes: %v", err) + } + c.Infof("End Serialization: Len(%d); Duration: %v", len(bits), time.Since(start)) + + // Storage + if err := sharder.Writer(c, "graph", bits); err != nil { + c.Errorf("Writer Error: %s", err) + return + } + c.Infof("Write Complete; Duration: %v", time.Since(start)) + + // Count types of nodes + binCtr := make(map[graph.NodeType]uint64) + for _, node := range g.Nodes() { + binCtr[node.Type]++ + } + + // Log out types of nodes + total := uint64(0) + for key, value := range binCtr { + c.Infof("Nodes (%s): %d", key, value) + total += value + } + c.Infof("Nodes (ALL): %d", total) + + // w/dupes w/invalid tags + // INFO: Nodes (IMG): 928728 + // INFO: Nodes (TAG): 244212 + // INFO: Nodes (POST): 40920 + // INFO: Nodes (ALL): 1213860 + // INFO: Time took: 31.310686059s + + // w/dupes w/o invalid Tags + // INFO: Nodes (IMG): 928728 + // INFO: Nodes (TAG): 237122 + // INFO: Nodes (POST): 40920 + // INFO: Nodes (ALL): 1206770 + // INFO: Time took: 31.850210891s + + // w/o dupes w/o invalid Tags + // INFO: Nodes (IMG): 886831 + // INFO: Nodes (POST): 40920 + // INFO: Nodes (TAG): 18221 + // INFO: Nodes (ALL): 945972 + // INFO: Time took: 32.651739532s + + // TODO: write to sharded datastore entity + + c.Infof("Time took: %v", time.Since(start)) +} diff --git a/app/cron/proj/graph/graph.go b/app/cron/proj/graph/graph.go new file mode 100644 index 0000000..4bcae7a --- /dev/null +++ b/app/cron/proj/graph/graph.go @@ -0,0 +1,143 @@ +package graph + +import "errors" + +// TODO: add some graph processing functions + +// NodeID is a graph identifier +type NodeID uint64 + +// Graph is the serializable graph we have all been looking for +type Graph struct { + s *SerialGraph + dupes map[NodeType]map[string]NodeID // type > value > node +} + +// New creates a new Graph +func New(isDirected bool) *Graph { + return &Graph{ + s: &SerialGraph{ + Nodes: make(map[uint64]*Node), + Directed: isDirected, + NodeCount: 0, + }, + dupes: make(map[NodeType]map[string]NodeID), + } +} + +// Get returns an associated node for a given ID +func (g *Graph) Get(id NodeID) *Node { + return g.s.Nodes[uint64(id)] +} + +// Add creates and adds a node to the graph +func (g *Graph) Add(value string, ttype NodeType, weight int64) NodeID { + + // Check duplicate node (add weight) + dupe := g.dupes[ttype][value] + if dupe != 0 { + g.Get(dupe).Weight += weight + return dupe + } + + // Create new node + id := g.genNodeID() + n := &Node{ + Value: value, + Weight: weight, + Type: ttype, + Adjacent: make(map[uint64]int64, 0), + } + g.s.Nodes[id] = n + + // Add dupe check to list + dub, ok := g.dupes[ttype] + if !ok { + dub = make(map[string]NodeID) + g.dupes[ttype] = dub + } + nid := NodeID(id) + dub[value] = nid + return nid +} + +// Connect connects nodes to and from with an edge of weight w +func (g *Graph) Connect(from, to NodeID, weight int64) error { + if to == 0 || from == 0 { + return errors.New("Cannot add edge to nil node") + } + g.Get(from).Adjacent[uint64(to)] += weight // Directed edge + if !g.s.Directed { + g.Get(to).Adjacent[uint64(from)] += weight // UnDirected edge (return trip) + } + return nil +} + +func (g *Graph) genNodeID() (id uint64) { + g.s.NodeCount++ + id = g.s.NodeCount + return id +} + +// Nodes returns all the nodes in the Graph +func (g *Graph) Nodes() []*Node { + n := make([]*Node, len(g.s.Nodes)) + ctr := 0 + for _, node := range g.s.Nodes { + n[ctr] = node + ctr++ + } + return n +} + +// DecodeGraph hydrates a graph from a serialized format (returned by Bytes()). +func DecodeGraph(data []byte) (*Graph, error) { + sg, err := DecodeSerialGraph(data) + if err != nil { + return nil, err + } + g := New(false) // Don't care about directed because it's stored on s (assigned below) + g.s = sg + + // Hydrate Graph from SerialGraph + for id, node := range sg.Nodes { + nn := g.dupes[node.Type] + if nn == nil { + nn = make(map[string]NodeID) + g.dupes[node.Type] = nn + } + nn[node.Value] = NodeID(id) + } + return g, nil +} + +// Bytes flattens a graph to a flat file format +func (g *Graph) Bytes() ([]byte, error) { + return g.s.Bytes() +} + +// func main() { +// log.Println("Do stuff...") +// +// graph := New(false) +// a := graph.Add("http://super-stupid-long-url.com/more-crap-over-here1", NodeType_UNKNOWN, 0) +// b := graph.Add("http://super-stupid-long-url.com/more-crap-over-here2", NodeType_UNKNOWN, 0) +// graph.Connect(a, b, 0) +// +// // Compress +// bits, err := graph.Bytes() +// if err != nil { +// panic(err) +// } +// +// // Decompress +// result, err := DecodeGraph(bits) +// if err != nil { +// panic(err) +// } +// +// // Compare +// log.Printf("Message (%d): %q", len(bits), string(bits)) +// log.Printf("Digit:\n%v\n%v", graph, result) +// log.Printf("Nodes:\n%v\n%v", graph.s.Nodes, result.s.Nodes) +// } diff --git a/app/cron/proj/graph/graph.pb.go b/app/cron/proj/graph/graph.pb.go new file mode 100644 index 0000000..9e19a04 --- /dev/null +++ b/app/cron/proj/graph/graph.pb.go @@ -0,0 +1,118 @@ +// Code generated by protoc-gen-go. +// source: graph.proto +// DO NOT EDIT! + +/* +Package graph is a generated protocol buffer package. + +It is generated from these files: + graph.proto + +It has these top-level messages: + SerialGraph + Node +*/ +package graph + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +type NodeType int32 + +const ( + NodeType_UNKNOWN NodeType = 0 + NodeType_POST NodeType = 1 + NodeType_IMG NodeType = 2 + NodeType_TAG NodeType = 3 + NodeType_USER NodeType = 4 +) + +var NodeType_name = map[int32]string{ + 0: "UNKNOWN", + 1: "POST", + 2: "IMG", + 3: "TAG", + 4: "USER", +} +var NodeType_value = map[string]int32{ + "UNKNOWN": 0, + "POST": 1, + "IMG": 2, + "TAG": 3, + "USER": 4, +} + +func (x NodeType) String() string { + return proto.EnumName(NodeType_name, int32(x)) +} +func (NodeType) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +type SerialGraph struct { + Nodes map[uint64]*Node `protobuf:"bytes,1,rep,name=nodes" json:"nodes,omitempty" protobuf_key:"varint,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + Directed bool `protobuf:"varint,2,opt,name=directed" json:"directed,omitempty"` + NodeCount uint64 `protobuf:"varint,3,opt,name=nodeCount" json:"nodeCount,omitempty"` +} + +func (m *SerialGraph) Reset() { *m = SerialGraph{} } +func (m *SerialGraph) String() string { return proto.CompactTextString(m) } +func (*SerialGraph) ProtoMessage() {} +func (*SerialGraph) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *SerialGraph) GetNodes() map[uint64]*Node { + if m != nil { + return m.Nodes + } + return nil +} + +type Node struct { + Value string `protobuf:"bytes,1,opt,name=value" json:"value,omitempty"` + Weight int64 `protobuf:"varint,2,opt,name=weight" json:"weight,omitempty"` + Type NodeType `protobuf:"varint,3,opt,name=type,enum=graph.NodeType" json:"type,omitempty"` + Adjacent map[uint64]int64 `protobuf:"bytes,4,rep,name=adjacent" json:"adjacent,omitempty" protobuf_key:"varint,1,opt,name=key" protobuf_val:"varint,2,opt,name=value"` +} + +func (m *Node) Reset() { *m = Node{} } +func (m *Node) String() string { return proto.CompactTextString(m) } +func (*Node) ProtoMessage() {} +func (*Node) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *Node) GetAdjacent() map[uint64]int64 { + if m != nil { + return m.Adjacent + } + return nil +} + +func init() { + proto.RegisterEnum("graph.NodeType", NodeType_name, NodeType_value) +} + +var fileDescriptor0 = []byte{ + // 294 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x54, 0x91, 0x41, 0x4b, 0xc3, 0x40, + 0x10, 0x85, 0xdd, 0xee, 0xb6, 0x4d, 0x67, 0x69, 0x5d, 0xe7, 0x14, 0x03, 0x85, 0xd2, 0x53, 0x51, + 0x89, 0x10, 0x2f, 0x2a, 0x78, 0x28, 0x52, 0x82, 0x88, 0xa9, 0x98, 0x14, 0xcf, 0xb1, 0x59, 0xda, + 0x68, 0x49, 0x42, 0xdc, 0x2a, 0xf9, 0x2d, 0xde, 0xfc, 0xa5, 0x66, 0xd7, 0x2a, 0xe9, 0x2d, 0x33, + 0xef, 0x7d, 0xf3, 0x5e, 0x58, 0xe0, 0xab, 0x32, 0x2e, 0xd6, 0x6e, 0x51, 0xe6, 0x2a, 0xc7, 0xb6, + 0x19, 0xc6, 0x5f, 0x04, 0x78, 0x28, 0xcb, 0x34, 0xde, 0xf8, 0x7a, 0xc6, 0x33, 0x68, 0x67, 0x79, + 0x22, 0xdf, 0x6d, 0x32, 0xa2, 0x13, 0xee, 0x0d, 0xdd, 0x5f, 0xa6, 0x61, 0x71, 0x03, 0xad, 0xcf, + 0x32, 0x55, 0x56, 0x28, 0xc0, 0x4a, 0xd2, 0x52, 0x2e, 0x95, 0x4c, 0xec, 0xd6, 0x88, 0x4c, 0x2c, + 0x3c, 0x82, 0x9e, 0xe6, 0x6f, 0xf3, 0x6d, 0xa6, 0x6c, 0x5a, 0xaf, 0x98, 0x73, 0x05, 0xd0, 0x40, + 0x38, 0xd0, 0x37, 0x59, 0xd5, 0xe7, 0x6b, 0x09, 0x1d, 0x68, 0x7f, 0xc4, 0x9b, 0xad, 0x34, 0x30, + 0xf7, 0xf8, 0x2e, 0x4d, 0xdb, 0xaf, 0x5b, 0x97, 0x64, 0xfc, 0x4d, 0x80, 0xe9, 0x01, 0xfb, 0x7f, + 0x46, 0xcd, 0xf5, 0x70, 0x00, 0x9d, 0x4f, 0x99, 0xae, 0xd6, 0xca, 0x80, 0x14, 0x87, 0xc0, 0x54, + 0x55, 0x48, 0x13, 0x38, 0xf0, 0x0e, 0x1b, 0x67, 0xa2, 0x7a, 0x8d, 0xa7, 0x60, 0xc5, 0xc9, 0x6b, + 0xbc, 0x94, 0x75, 0x27, 0x66, 0xfe, 0xeb, 0xb8, 0x61, 0x71, 0xa7, 0x3b, 0xcd, 0x14, 0x74, 0xce, + 0xa1, 0xbf, 0xb7, 0xd8, 0x6f, 0xdc, 0x6f, 0x36, 0xa6, 0xba, 0xe4, 0xc9, 0x0d, 0x58, 0xff, 0x49, + 0x1c, 0xba, 0x8b, 0xe0, 0x3e, 0x98, 0x3f, 0x07, 0xe2, 0x00, 0x2d, 0x60, 0x8f, 0xf3, 0x30, 0x12, + 0x04, 0xbb, 0x40, 0xef, 0x1e, 0x7c, 0xd1, 0xd2, 0x1f, 0xd1, 0xd4, 0x17, 0x54, 0x6b, 0x8b, 0x70, + 0xf6, 0x24, 0xd8, 0x4b, 0xc7, 0xbc, 0xc7, 0xc5, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xbf, 0x47, + 0x15, 0xe3, 0x9e, 0x01, 0x00, 0x00, +} diff --git a/app/cron/proj/graph/graph.proto b/app/cron/proj/graph/graph.proto new file mode 100644 index 0000000..3d9bee5 --- /dev/null +++ b/app/cron/proj/graph/graph.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; +package graph; + +message SerialGraph { + map nodes = 1; + bool directed = 2; + uint64 nodeCount = 3; +} + +enum NodeType { + UNKNOWN = 0; + POST = 1; + IMG = 2; + TAG = 3; + USER = 4; +} + +message Node { + string value = 1; + int64 weight = 2; + NodeType type = 3; + map adjacent = 4; +} diff --git a/app/cron/proj/graph/load.sh b/app/cron/proj/graph/load.sh new file mode 100755 index 0000000..b45f7ee --- /dev/null +++ b/app/cron/proj/graph/load.sh @@ -0,0 +1,14 @@ +#!/bin/sh +set -e +export PATH=$PATH:$GOPATH/bin +if ! which proto >/dev/null; then + echo "Installing proto and protoc-gen-go" + go get -u github.com/golang/protobuf/{proto,protoc-gen-go} +else + echo "Proto and protoc-gen-go already installed" +fi + +echo "Generating Protobuff files..." +protoc --go_out=. *.proto +sed -i '' '/RegisterType/d' graph.pb.go +echo "Protobuff files generated." diff --git a/app/cron/proj/graph/serialGraph.go b/app/cron/proj/graph/serialGraph.go new file mode 100644 index 0000000..af059f2 --- /dev/null +++ b/app/cron/proj/graph/serialGraph.go @@ -0,0 +1,76 @@ +package graph + +import ( + "bytes" + "compress/gzip" + + "github.com/golang/protobuf/proto" +) + +const shouldGZIP = true + +// DecodeSerialGraph converts a byte string back into a hydrated SerialGraph. +func DecodeSerialGraph(data []byte) (g *SerialGraph, err error) { + if shouldGZIP { + if data, err = decompress(data); err != nil { + return nil, err + } + } + + // log.Printf("DecodeSerialGraph: %q", data) + + g = &SerialGraph{} + if err := proto.Unmarshal(data, g); err != nil { + return nil, err + } + return g, nil +} + +// Bytes converts a serial graph to a gzipped graph (used for storage) +func (g *SerialGraph) Bytes() (data []byte, err error) { + data, err = proto.Marshal(g) + if err != nil { + return nil, err + } + + // log.Printf(" Graph.Bytes: %q", data) + + if shouldGZIP { + if data, err = compress(data); err != nil { + return nil, err + } + } + return data, nil +} + +// Simple GZIP decompression +func decompress(garbage []byte) ([]byte, error) { + gz, err := gzip.NewReader(bytes.NewBuffer(garbage)) + if err != nil { + return nil, err + } + var buff bytes.Buffer + if _, err := buff.ReadFrom(gz); err != nil { + return nil, err + } + if err := gz.Close(); err != nil { + return nil, err + } + return buff.Bytes(), nil +} + +// Simple GZIP compression +func compress(data []byte) ([]byte, error) { + var buff bytes.Buffer + gz := gzip.NewWriter(&buff) + if _, err := gz.Write(data); err != nil { + return nil, err + } + if err := gz.Flush(); err != nil { + return nil, err + } + if err := gz.Close(); err != nil { + return nil, err + } + return buff.Bytes(), nil +} diff --git a/app/cron/proj/proj.go b/app/cron/proj/proj.go new file mode 100644 index 0000000..525bfd9 --- /dev/null +++ b/app/cron/proj/proj.go @@ -0,0 +1,88 @@ +package proj + +import ( + "bytes" + "encoding/xml" + + "appengine" + "appengine/datastore" + + "github.com/bign8/chive-show/app/cron/chain" + "github.com/bign8/chive-show/app/cron/crawler" +) + +// XMLPage xml processor for a page +type XMLPage struct { + Items []struct { + GUID string `xml:"guid"` + Tags []string `xml:"category"` + Imgs []struct { + URL string `xml:"url,attr"` + } `xml:"content"` + } `xml:"channel>item"` +} + +// Item is a post item +type Item struct { + GUID string + Tags []string + Imgs []string +} + +func getItems(c appengine.Context) <-chan interface{} { + pages := puller(c) + return chain.FanOut(50, 10000, pages, flatten(c)) +} + +func puller(c appengine.Context) <-chan interface{} { + out := make(chan interface{}, 10000) + + // TODO: improve pulling performance (cache number of xml in stage_1, fan out pulling) + go func() { + defer close(out) + q := datastore.NewQuery(crawler.XML) + iterator := q.Run(c) + for { + var s crawler.Store + _, err := iterator.Next(&s) + if err == datastore.Done { + break // No further entities match the query. + } + if err != nil { + c.Errorf("fetching next Person: %v", err) + break + } + out <- s.XML + } + }() + return out +} + +func flatten(c appengine.Context) chain.Worker { + return func(obj interface{}, out chan<- interface{}, idx int) { + var xmlPage XMLPage + var imgs []string + + // Clean FormFeed characters from data + data := bytes.Replace(obj.([]byte), []byte("\u000C"), nil, -1) + + // Start up decoder + decoder := xml.NewDecoder(bytes.NewReader(data)) + decoder.Entity = xml.HTMLEntity + + // Parse the XML of an object + if err := decoder.Decode(&xmlPage); err != nil { + c.Errorf("Flatten %d: %v", idx, err) + return + } + + // Process items in a particular page + for _, item := range xmlPage.Items { + imgs = make([]string, len(item.Imgs)) + for i, img := range item.Imgs { + imgs[i] = img.URL + } + out <- Item{item.GUID, item.Tags, imgs} + } + } +} diff --git a/app/cron/proj/tags.go b/app/cron/proj/tags.go new file mode 100644 index 0000000..e28bf8e --- /dev/null +++ b/app/cron/proj/tags.go @@ -0,0 +1,115 @@ +package proj + +import ( + "bytes" + "fmt" + "net/http" + "runtime" + "strings" + "time" + + "github.com/bign8/chive-show/app/cron/chain" + + "appengine" + "appengine/memcache" +) + +const tagsMemcacheKey = "tags-baby" + +// Tags etrieves the tags from the dataset +func Tags(c appengine.Context, w http.ResponseWriter, r *http.Request) { + start := time.Now() + defer func() { + c.Infof("Time took: %v", time.Since(start)) + }() + // w.Header().Set("Content-Type", "text/csv; charset=utf-8") + + // Check from memcache + if item, err := memcache.Get(c, tagsMemcacheKey); err == nil { + w.Write(item.Value) + return + } + + // Pretty sure this doesn't work on prod, but works awesome in dev + runtime.GOMAXPROCS(runtime.NumCPU()) + tags := chain.FanOut(50, 10000, getItems(c), tags) // Pull and clean tags + + // Build a counter dictionary + found := map[string]int64{} + for tag := range tags { + found[tag.(string)]++ + } + + // Compute average (used to clip data, so it's not huge) + avg := int64(0) + for _, value := range found { + avg += value + } + avg /= int64(len(found)) + c.Infof("Num tags: %v; Avg: %v", len(found), avg) + + // Compute the 75%-tile + cap := int64(0) + for key, value := range found { + if avg <= value { + cap += value + } else { + delete(found, key) + } + } + cap /= int64(len(found)) + c.Infof("Above average tags: %v; 75%%-tile: %v", len(found), cap) + + // Output results + var buffer bytes.Buffer + result := int64(0) + for key, value := range found { + if cap <= value { + buffer.WriteString(fmt.Sprintf("%s,%d\n", key, value)) + result++ + } + } + data := buffer.Bytes() + w.Write(data) + c.Infof("Returned tags: %v", result) + + // Save to memcache, but only wait up to 3ms. + done := make(chan bool, 1) + go func() { + memcache.Set(c, &memcache.Item{ + Key: tagsMemcacheKey, + Value: data, + }) + done <- true + }() + select { + case <-done: + case <-time.After(3 * time.Millisecond): + } +} + +func tags(obj interface{}, out chan<- interface{}, idx int) { + for _, tag := range validTags((obj.(Item)).Tags) { + out <- tag + } +} + +// http://xpo6.com/list-of-english-stop-words/ +var chiveWords = "web only,thebrigade,theberry,thechive,chive,chive humanity," +var stopWords = chiveWords + "a,able,about,across,after,all,almost,also,am,among,an,and,any,are,as,at,be,because,been,but,by,can,cannot,could,dear,did,do,does,either,else,ever,every,for,from,get,got,had,has,have,he,her,hers,him,his,how,however,i,if,in,into,is,it,its,just,least,let,like,likely,may,me,might,most,must,my,neither,no,nor,not,of,off,often,on,only,or,other,our,own,rather,said,say,says,she,should,since,so,some,than,that,the,their,them,then,there,these,they,this,tis,to,too,twas,us,wants,was,we,were,what,when,where,which,while,who,whom,why,will,with,would,yet,you,your" +var stops = map[string]bool{} + +func validTags(tags []string) (res []string) { + if len(stops) == 0 { + for _, s := range strings.Split(stopWords, ",") { + stops[s] = true + } + } + for _, tag := range tags { + tag = strings.ToLower(tag) + if !stops[tag] { + res = append(res, tag) + } + } + return +} diff --git a/app/helpers/sharder/reader.go b/app/helpers/sharder/reader.go new file mode 100644 index 0000000..29c9950 --- /dev/null +++ b/app/helpers/sharder/reader.go @@ -0,0 +1,44 @@ +package sharder + +import ( + "sync" + + "appengine" + "appengine/datastore" +) + +// Reader creates a new shard reader to retrieve data from datastore +func Reader(c appengine.Context, name string) ([]byte, error) { + if name == "" { + return nil, ErrInvalidName + } + + var master shardMaster + if err := datastore.Get(c, masterKey(c, name), &master); err != nil { + panic(err) + return nil, err + } + shards := numShards(master.Size) + + var wg sync.WaitGroup + wg.Add(shards) + data := make([]byte, master.Size) + for i := 0; i < shards; i++ { + go func(i int) { + var shardData shard + if err := datastore.Get(c, shardKey(c, name, i), &shardData); err != nil { + panic(err) + } + // c.Infof("Out Data %d: %q", i, string(shardData.Data)) + + end := i*divisor + divisor + if end > master.Size { + end = master.Size + } + copy(data[i*divisor:end], shardData.Data) + wg.Done() + }(i) + } + wg.Wait() + return data, nil +} diff --git a/app/helpers/sharder/sharder.go b/app/helpers/sharder/sharder.go new file mode 100644 index 0000000..4fc6316 --- /dev/null +++ b/app/helpers/sharder/sharder.go @@ -0,0 +1,43 @@ +package sharder + +import ( + "errors" + "fmt" + + "appengine" + "appengine/datastore" +) + +// TODO: datastore.RunInTransaction +// TODO: delete existing shards greater than current +// TODO: don't panic and actually use error chans +// TODO: possibly use put and get multi for up to 10MB + +const ( + masterKind = "shard-master" + shardKind = "shard-pieces" + divisor = 1e6 // 1MB +) + +// ErrInvalidName because reasons +var ErrInvalidName = errors.New("Must provide name of sharded item") + +func masterKey(c appengine.Context, name string) *datastore.Key { + return datastore.NewKey(c, masterKind, name, 0, nil) +} + +func shardKey(c appengine.Context, name string, idx int) *datastore.Key { + return datastore.NewKey(c, shardKind, fmt.Sprintf("%s-%d", name, idx), 0, nil) +} + +func numShards(size int) int { + return (size-1)/divisor + 1 +} + +type shardMaster struct { + Size int `datastore:"size"` +} + +type shard struct { + Data []byte `datastore:"data"` +} diff --git a/app/helpers/sharder/sharder_test.go b/app/helpers/sharder/sharder_test.go new file mode 100644 index 0000000..a66928f --- /dev/null +++ b/app/helpers/sharder/sharder_test.go @@ -0,0 +1,51 @@ +package sharder + +import ( + "bytes" + "strings" + "testing" + + "appengine/aetest" +) + +func TestFullCircle(t *testing.T) { + // TODO: verify 20 shards + + c, err := aetest.NewContext(nil) + if err != nil { + t.Fatal(err) + } + defer c.Close() + + data := []byte(strings.Repeat("01234567890123456789", 1e6)) + + // Writing + err = Writer(c, "test", data) + if err != nil { + t.Fatal(err) + } + + // Reading + read, err := Reader(c, "test") + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(read, data) { + t.Fail() + } +} + +var test bool + +func BenchmarkFullCycle(b *testing.B) { + c, _ := aetest.NewContext(nil) + defer c.Close() + data := []byte(strings.Repeat("1", 1e6)) + + for i := 0; i < b.N; i++ { + Writer(c, "test", data) + read, _ := Reader(c, "test") + test = bytes.Equal(read, data) + } +} diff --git a/app/helpers/sharder/writer.go b/app/helpers/sharder/writer.go new file mode 100644 index 0000000..7d018e0 --- /dev/null +++ b/app/helpers/sharder/writer.go @@ -0,0 +1,61 @@ +package sharder + +import ( + "sync" + + "appengine" + "appengine/datastore" +) + +// Writer shards and stores a byte String +func Writer(c appengine.Context, name string, data []byte) error { + if name == "" { + return ErrInvalidName + } + + // Attempt to get existing key + key := masterKey(c, name) + oldMaster := shardMaster{} + oldShards := 0 + if datastore.Get(c, key, &oldMaster) == nil { + oldShards = numShards(oldMaster.Size) + } + + // Store shardMaster + master := shardMaster{len(data)} + shards := numShards(master.Size) + if _, err := datastore.Put(c, key, &master); err != nil { + return err + } + + // shard data and store shards + var wg sync.WaitGroup + wg.Add(shards) + for i := 0; i < shards; i++ { + go func(i int) { + shardKey := shardKey(c, name, i) + shardData := data[i*divisor:] + if len(shardData) > divisor { + shardData = data[:divisor] + } + s := shard{shardData} + // w.ctx.Infof("Inn Data %d: %q", i, s.Data) + if _, err := datastore.Put(c, shardKey, &s); err != nil { + panic(err) + } + wg.Done() + }(i) + } + + // Delete shards that shouldn't be in datastore (write something smaller than before) + if oldShards > shards { + keys := make([]*datastore.Key, oldShards-shards) + for i := shards; i < oldShards; i++ { + keys[i-shards] = shardKey(c, name, i) + } + datastore.DeleteMulti(c, keys) + } + + wg.Wait() + return nil +} diff --git a/app.yaml b/yaml/app.yaml similarity index 59% rename from app.yaml rename to yaml/app.yaml index ebcc2cc..6efa065 100644 --- a/app.yaml +++ b/yaml/app.yaml @@ -8,19 +8,15 @@ skip_files: handlers: - url: /static - static_dir: static + static_dir: ../static - url: / - static_files: static/index.html - upload: static/index.html + static_files: ../static/index.html + upload: ../static/index.html - url: /(favicon\.ico|index\.html) - static_files: static/\1 - upload: static/(favicon\.ico|index\.html) - -- url: /cron/.* - script: _go_app - login: admin + static_files: ../static/\1 + upload: ../static/(favicon\.ico|index\.html) - url: /.* script: _go_app diff --git a/cron.yaml b/yaml/cron.yaml similarity index 85% rename from cron.yaml rename to yaml/cron.yaml index 407d25e..08dbe64 100644 --- a/cron.yaml +++ b/yaml/cron.yaml @@ -2,3 +2,4 @@ cron: - description: Parse feeds from source url: /cron/parse schedule: every 6 hours + target: cron diff --git a/index.yaml b/yaml/index.yaml similarity index 100% rename from index.yaml rename to yaml/index.yaml diff --git a/yaml/main.go b/yaml/main.go new file mode 100644 index 0000000..6cd9712 --- /dev/null +++ b/yaml/main.go @@ -0,0 +1,16 @@ +package main + +import ( + "net/http" + + "github.com/bign8/chive-show/app/api" + "github.com/bign8/chive-show/app/cron" +) + +func init() { + http.HandleFunc("/", http.NotFound) // Default Handler + + // Setup Other routes routes + api.Init() + cron.Init() +} diff --git a/yaml/module-cron.yaml b/yaml/module-cron.yaml new file mode 100644 index 0000000..c9ed0dd --- /dev/null +++ b/yaml/module-cron.yaml @@ -0,0 +1,26 @@ +application: crucial-alpha-706 +module: cron +version: uno +runtime: go +api_version: go1 +instance_class: B1 +basic_scaling: + max_instances: 1 + idle_timeout: 30m + +skip_files: +- test/* + +handlers: +- url: /cron/.* + script: _go_app + login: admin + +- url: /proj/.* + script: _go_app + login: admin + +error_handlers: + - file: err/default.html + - error_code: over_quota + file: err/over_quota.html