Skip to content

Commit

Permalink
Merge pull request #536 from inaturalist/webscale_features
Browse files Browse the repository at this point in the history
Notifications that use the new elasticsearch-based index.
  • Loading branch information
kueda committed Apr 27, 2015
2 parents 2e1422a + c2de4aa commit a59755b
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 48 deletions.
2 changes: 1 addition & 1 deletion app/controllers/observations_controller.rb
Expand Up @@ -1964,7 +1964,7 @@ def user_viewed_updates
"resource_type = 'Observation' AND resource_id = ? AND subscriber_id = ?",
@observation.id, current_user.id])
updates_scope.update_all(viewed_at: Time.now)
Update.elastic_index!(scope: updates_scope)
Update.elastic_index!(scope: updates_scope, delay: true)
end

def stats_adequately_scoped?
Expand Down
51 changes: 29 additions & 22 deletions app/controllers/users_controller.rb
Expand Up @@ -81,13 +81,13 @@ def activate
# Don't take these out yet, useful for admin user management down the road

def suspend
@user.suspend!
@user.suspend!
flash[:notice] = t(:the_user_x_has_been_suspended, :user => @user.login)
redirect_back_or_default(@user)
end

def unsuspend
@user.unsuspend!
@user.unsuspend!
flash[:notice] = t(:the_user_x_has_been_unsuspended, :user => @user.login)
redirect_back_or_default(@user)
end
Expand Down Expand Up @@ -280,11 +280,21 @@ def relationships
end

def dashboard
@pagination_updates = current_user.updates.limit(50).order("id DESC").includes(:resource, :notifier, :subscriber, :resource_owner)
@pagination_updates = @pagination_updates.where("id < ?", params[:from].to_i) if params[:from]
@pagination_updates = @pagination_updates.where(:notifier_type => params[:notifier_type]) unless params[:notifier_type].blank?
@pagination_updates = @pagination_updates.where(:resource_owner_id => current_user) if params[:filter] == "you"
filters = [ ]
wheres = { }
if params[:from]
filters << { range: { id: { lt: params[:from] } } }
end
unless params[:notifier_type].blank?
wheres[:notifier_type] = params[:notifier_type]
end
if params[:filter] == "you"
wheres[:resource_owner_id] = current_user.id
end
@pagination_updates = current_user.recent_notifications(
filters: filters, wheres: wheres, per_page: 50)
@updates = Update.load_additional_activity_updates(@pagination_updates)
Update.preload_associations(@updates, [ :resource, :notifier, :subscriber, :resource_owner ])
@update_cache = Update.eager_load_associates(@updates)
@grouped_updates = Update.group_and_sort(@updates, :update_cache => @update_cache, :hour_groups => true)
Update.user_viewed_updates(@pagination_updates)
Expand Down Expand Up @@ -315,32 +325,29 @@ def updates_count
end

def new_updates
@updates = current_user.updates.unviewed.activity.
includes(:resource, :notifier, :subscriber, :resource_owner).
order("id DESC").
limit(200)
wheres = { notification: :activity }
notifier_types = [(params[:notifier_types] || params[:notifier_type])].compact
unless notifier_types.blank?
notifier_types = notifier_types.map{|t| t.split(',')}.flatten.compact.uniq
wheres[:notifier_type] = notifier_types.map(&:downcase)
end
unless params[:resource_type].blank?
wheres[:resource_type] = params[:resource_type].downcase
end
@updates = current_user.recent_notifications(unviewed: true, per_page: 200, wheres: wheres)
unless request.format.json?
if @updates.count == 0
@updates = current_user.updates.activity.
includes(:resource, :notifier, :subscriber, :resource_owner).
order("id DESC").
limit(10).
where("viewed_at > ?", 1.day.ago)
@updates = current_user.recent_notifications(viewed: true, per_page: 10, wheres: wheres)
end
if @updates.count == 0
@updates = current_user.updates.activity.limit(5).order("id DESC")
@updates = current_user.recent_notifications(per_page: 5, wheres: wheres)
end
end
notifier_types = [(params[:notifier_types] || params[:notifier_type])].compact
unless notifier_types.blank?
notifier_types = notifier_types.map{|t| t.split(',')}.flatten.compact.uniq
@updates = @updates.where("notifier_type IN (?)", notifier_types)
end
@updates = @updates.where(:resource_type => params[:resource_type]) unless params[:resource_type].blank?
if !%w(1 yes y true t).include?(params[:skip_view].to_s)
Update.user_viewed_updates(@updates)
session[:updates_count] = 0
end
Update.preload_associations(@updates, [ :resource, :notifier, :subscriber, :resource_owner ])
@update_cache = Update.eager_load_associates(@updates)
@updates = @updates.sort_by{|u| u.created_at.to_i * -1}
respond_to do |format|
Expand Down
60 changes: 35 additions & 25 deletions app/models/update.rb
Expand Up @@ -175,17 +175,25 @@ def self.eager_load_associates(updates, options = {})
:project_invitation => [:project, :user],
:taxon_change => [:taxon, {:taxon_change_taxa => [:taxon]}, :user]
}
if includes[:observation]
if includes[:identification]
includes[:observation] << { identifications: includes[:identification] }
end
if includes[:comment]
includes[:observation] << { comments: includes[:comment] }
end
end
update_cache = {}
klasses = [
Comment,
Flag,
Identification,
ListedTaxon,
Observation,
Comment,
Flag,
Identification,
ListedTaxon,
Observation,
ObservationField,
Post,
Project,
ProjectInvitation,
Post,
Project,
ProjectInvitation,
Taxon,
TaxonChange,
User
Expand Down Expand Up @@ -214,25 +222,27 @@ def self.user_viewed_updates(updates)
updates = updates.to_a.compact
return if updates.blank?
subscriber_id = updates.first.subscriber_id
# using a transaction here to speed up the multiple inserts
Update.transaction do
# mark all as viewed
updates_scope = Update.where(id: updates)
updates_scope.update_all(viewed_at: Time.now)
Update.elastic_index!(scope: updates_scope)

# mark all as viewed
updates_scope = Update.where(id: updates)
updates_scope.update_all(viewed_at: Time.now)
Update.elastic_index!(scope: updates_scope)

# delete PAST activity updates that were not in this batch
clauses = []
update_ids = []
updates.each do |update|
next unless update.notification == 'activity'
Update.delay(:priority => USER_INTEGRITY_PRIORITY).delete_all([
"id < ? AND notification = 'activity' AND subscriber_id = ? AND resource_type = ? AND resource_id = ?",
update.id, update.subscriber_id, update.resource_type, update.resource_id
])
# delete PAST activity updates that were not in this batch
clauses = []
update_ids = []
updates.each do |update|
next unless update.notification == 'activity'
Update.delay(:priority => USER_INTEGRITY_PRIORITY).delete_all([
"id < ? AND notification = 'activity' AND subscriber_id = ? AND resource_type = ? AND resource_id = ?",
update.id, update.subscriber_id, update.resource_type, update.resource_id
])
end
Update.delay(priority: USER_INTEGRITY_PRIORITY, queue: "slow",
run_at: 6.hours.from_now, unique_hash: { "Update::sweep_for_user": subscriber_id }).
sweep_for_user(subscriber_id)
end
Update.delay(priority: USER_INTEGRITY_PRIORITY, queue: "slow",
run_at: 6.hours.from_now, unique_hash: { "Update::sweep_for_user": subscriber_id }).
sweep_for_user(subscriber_id)
end

def self.sweep_for_user(user_id)
Expand Down
16 changes: 16 additions & 0 deletions app/models/user.rb
Expand Up @@ -608,6 +608,22 @@ def set_community_taxa_if_pref_changed
true
end

def recent_notifications(options={})
options[:filters] ||= [ ]
options[:wheres] ||= { }
options[:per_page] ||= 10
if options[:unviewed]
options[:filters] << { not: { exists: { field: :viewed_at } } }
elsif options[:viewed]
options[:filters] << { range: { viewed_at: { gt: 1.day.ago } } }
end
Update.elastic_paginate(
where: options[:wheres].merge({ subscriber_id: id }),
filters: options[:filters],
per_page: options[:per_page],
sort: { id: :desc })
end

def self.default_json_options
{
:except => [:crypted_password, :salt, :old_preferences, :activation_code, :remember_token, :last_ip,
Expand Down

0 comments on commit a59755b

Please sign in to comment.