Skip to content

Commit

Permalink
cryptic commit msg. Enhance worker bee selections
Browse files Browse the repository at this point in the history
  • Loading branch information
bcambel committed Jul 22, 2017
1 parent 437f2f5 commit cddb8b8
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 46 deletions.
1 change: 1 addition & 0 deletions project.clj
Expand Up @@ -60,6 +60,7 @@
[hikari-cp "1.7.5"]
[org.clojars.runa/clj-kryo "1.5.0"]
[org.clojure/tools.namespace "0.3.0-alpha4"]
[com.stuartsierra/frequencies "0.1.0"]
]
:plugins [[lein-environ "1.0.0"]
[s3-wagon-private "1.1.2"]]
Expand Down
22 changes: 22 additions & 0 deletions sql/list_indices.sql
@@ -0,0 +1,22 @@
select
t.relname as table_name,
i.relname as index_name,
array_to_string(array_agg(a.attname), ', ') as column_names
from
pg_class t,
pg_class i,
pg_index ix,
pg_attribute a
where
t.oid = ix.indrelid
and i.oid = ix.indexrelid
and a.attrelid = t.oid
and a.attnum = ANY(ix.indkey)
and t.relkind = 'r'
and t.relname like 'github_project%'
group by
t.relname,
i.relname
order by
t.relname,
i.relname;
92 changes: 60 additions & 32 deletions src/clj/hsm/integration/ghub.clj
Expand Up @@ -156,11 +156,13 @@
; (log/info url)
(when-let [result (try
(dd/timed {} "worker.bee.call" {:service "oss"}
(-> (client/get url)
(-> (client/get url header-settings)
(get :body)
(parse-string true)))
(catch Exception ex
(log/error ex)))]
(do
(log/warnf "Failed in bee %s for %s %s" bee task param)
(log/error ex))))]
result)))

(defn clean-next-url
Expand Down Expand Up @@ -189,8 +191,10 @@
(when-let [reset (to-int (:X-RateLimit-Reset report))]
(let [now (epoch-sec)
credits-left (to-int (:X-RateLimit-Remaining report))
seconds-left (- reset now)
seconds-left (if (zero? seconds-left) 1 seconds-left)
report (merge report {:bee bee :rate (if (zero? credits-left) 0
(float (/ (- reset now) credits-left)))})]
(float (/ credits-left seconds-left )))})]
(wcar {:pool {} :spec {:host "localhost" :port 6379}}
(car/hset "oss.worker.bee.state" bee (generate-string report))
(car/zadd "oss.worker.bee.tokens" credits-left bee )
Expand All @@ -201,18 +205,30 @@
;; bee has time reset time already set, so don't use it
;; until the reset has been done.

(defn pick-bee-by-random []
(defn pick-bee-by-random
[]
(let [hive (wcar {:pool {} :spec {:host "localhost" :port 6379}}
(car/smembers "oss.worker.bees"))]
(rand-nth hive)
))

(defn bees-with-proper-state
[states]
(filter (fn[x] (or (> (epoch-sec) (Integer/parseInt (:X-RateLimit-Reset x)))
(> (Integer/parseInt (:X-RateLimit-Remaining x)) 0)))
(map (fn [[x y]] (parse-string y true)) (partition 2 states)))
)

(defn pick-bee-by-random-with-state []
; (log/info "Random bee pick")
(let [[hive states] (wcar {:pool {} :spec {:host "localhost" :port 6379}}
(car/smembers "oss.worker.bees")
(car/hgetall "oss.worker.bee.state"))
states (filter (fn[x] (or (> (epoch-sec) (Integer/parseInt (:X-RateLimit-Reset x)))
(> (Integer/parseInt (:X-RateLimit-Remaining x)) 0)))
(map (fn [[x y]] (parse-string y true)) (partition 2 states)))
chosen (rand-nth states)]
possible-bees (bees-with-proper-state states)
chosen (rand-nth possible-bees)]
(if chosen
(do
(log/infof "Choses %s" chosen)
; (log/infof "Choses %s" chosen)
(:bee chosen))
(do
(log/info "Rollback to random hive bee")
Expand Down Expand Up @@ -241,9 +257,9 @@
(defn pick-a-bee
[]
(apply (rand-nth [pick-bee-by-rank
pick-bee-by-rank
pick-bee-by-random-with-state
pick-bee-by-random-with-state
pick-bee-by-random
pick-bee-by-rate
pick-bee-by-rate]) []))

;; if the chosen bee has the credit 0
Expand All @@ -252,24 +268,26 @@
"Picks up one of the worker bee as the delegate to fetch the URL and finds the
next link. "
[url & {:keys [bee-picker] :or {bee-picker pick-a-bee}}]

(let [bee (bee-picker) ;
fetcher (fn [url & args] (assign-worker-bee "get-url" bee (codec/url-encode url)
; :bee-picker (fn [] "localhost:10554")
))]
(let [next-step (fetch-url-and-next url {:get-url-fn fetcher})
next-url (:next-url next-step)
_ (log/sometimes 0.9 (log/info bee (:credits next-step)))]
(dd/increment {} "github.calls" 1 {:bee bee :result (not (nil? next-url))})
(credit-report bee (:credits next-step))
(if (nil? next-url)
next-step
(do
(let [next-url* (clean-next-url next-url)
next-step* (assoc next-step :next-url next-url*)]
(log/info (:next-url next-step*))
next-step*
))))))
(let [bee (bee-picker)
url-encoded (try
(codec/url-encode url)
(catch Exception ex
(log/warnf "Failed to URL Encode %s" url )
))]
(when url-encoded
(let [fetcher (fn [url & args] (assign-worker-bee "get-url" bee url-encoded))
next-step (fetch-url-and-next url {:get-url-fn fetcher})
next-url (:next-url next-step)
_ (log/sometimes 0.009 (log/info bee (:credits next-step)))]
(dd/increment {} "github.calls" 1 {:bee bee :result (not (nil? next-url))})
(credit-report bee (:credits next-step))
(if (nil? next-url)
next-step
(do
(let [next-url* (clean-next-url next-url)
next-step* (assoc next-step :next-url next-url*)]
(log/info (:next-url next-step*))
next-step*)))))))


(defn user-data
Expand Down Expand Up @@ -419,7 +437,7 @@
(let [url (format "%s/users/%s?"
ghub-root user-login (env :client-id) (env :client-secret))
response (fetch-url url)]
(log/info response)
; (log/info response)
;; iff the response is {:error 404}
(if (or (= "Empty Response" (get response :reason ))
(= 404 (:error (:data response ))))
Expand Down Expand Up @@ -763,12 +781,22 @@
(recur next-url (inc looped) max)
)))

(defn fetch-url-with-retry
[url]
(let [{:keys [success next-url data] :as result} (fetch-url url)]
(if (nil? next-url)
(do
(log/infof "Retrying %s" url)
(let [{:keys [success next-url data] :as retry-result} (fetch-url url)]
retry-result))
result)))

(defn iterate-projects
[url looped max]
(log/warn (format "[PROJECTSINCE]Loop %d. %s" looped url))
(let [{:keys [success next-url data]} (fetch-url url)]
(let [{:keys [success next-url data]} (fetch-url-with-retry url)]
(insert-projects {} data)
(if (>= looped max)
(if (or (nil? next-url) (>= looped max))
:done
(recur next-url (inc looped) max)
)))
Expand Down
26 changes: 20 additions & 6 deletions src/clj/hsm/tasks/queue.clj
Expand Up @@ -9,24 +9,38 @@
[clj-http.client :as client]
[truckerpath.clj-datadog.core :as dd]
[taoensso.carmine :as car :refer (wcar)]
[raven-clj.core :refer [capture]]
[hsm.conf :as conf]
[raven-clj.interfaces :refer [stacktrace]]
))


(defn exec-with-care
[exec-fn args]
(future
(try
(apply exec-fn args)
(catch Exception ex
(log/error ex)
(log/warnf "Error processing %s %s" exec-fn ex)
(capture (-> @conf/app-conf :data :sentry-dsn )
(-> {:message (format "Processing Exception on %s" exec-fn)}
(stacktrace ex)))
(log/warn "Captured trace")))))

(defn process-msg
[msg]
(let [{:keys [type params]} (parse-string (:body msg) true)]
(log/infof "Processing %s %s" type params)
(try
(condp = type
"update-project" (gh/update-project-remotely params)
"enhance-user" (gh/find-n-update-user nil params true)
"update-project" (future (gh/update-project-remotely params))
"enhance-user" (future (gh/find-n-update-user nil params true))
"sync-user" (future
(gh/sync-some-users {:connection nil} (Integer/parseInt params)))
"sync-single-user" (gh/find-n-update-user {} params)
"import-org-events" (gh/import-org-events params)
"sync-single-user" (future (gh/find-n-update-user {} params))
"import-org-events" (future (gh/import-org-events params))
"iterate-users" (future (gh/iterate-users-since (Long/parseLong params)))
"iterate-projects" (gh/iterate-projects-since (Long/parseLong params))
"iterate-projects" (exec-with-care gh/iterate-projects-since [(Long/parseLong params)])
(log/warn (str "unexpected value ->" type))
)
(catch Exception ex
Expand Down
9 changes: 1 addition & 8 deletions src/clj/hsm/views.clj
Expand Up @@ -53,17 +53,10 @@
[:iframe {:src "http://ghbtns.com/github-btn.html?user=bcambel&repo=oss.io&type=watch&count=true&size=normal"
:allowtransparency true :frameborder 0 :scroling 0 :width "260px" :height "30px"}]
[:hr]
; [:p "Looking for " [:b[:span.red "Python Tutorials? "]] [:br] [:a {:href "/tutorial/?ref=left_menu_link"} "Check these awesome tutorials"]]
; [:hr]
; [:a.twitter-share-button {:href "https://twitter.com/share"
; :data-text (format "Top %s Projects" platform)
; :data-via "pythonhackers" :data-url (format "%s/%s" host page) :data-size :normal
; :data-hashtags "python,hackers,github"
; } "Tell your friends"]
[:a.twitter-follow-button {:href "https://twitter.com/pythonhackers" :data-show-count true :data-size :small }]

[:hr]
[:script#_carbonads_js {:type "text/javascript" :src (format "//cdn.carbonads.com/carbon.js?zoneid=1673&serve=C6AILKT&placement=%s" (get-placement host)) }]
[:script#_carbonads_js {:type "text/javascript" :src (format "//cdn.carbonads.com/carbon.js?zoneid=1673&serve=C6AILKT&placement=ossio") }]
])

(defhtml languages-pane
Expand Down

0 comments on commit cddb8b8

Please sign in to comment.