Skip to content

Commit

Permalink
[PART 1] Create a "priority only" distribution (#14745)
Browse files Browse the repository at this point in the history
* Priority distributions wip

* Fix tests

* make docs!

* Code climate

* Allow unlimited priority legacy appeals to be distributed

* Fix bug on wrong number of hearing cases distributed

* Create priority only distributions

* Suggestions

* Update to priority_push

* Update to priority_push

* Fix schema

* Code Climate

* Updates

* Updates
 to priority push

* fix specs

* Allow unlimited priority legacy appeals to be distributed

* Fix bug on wrong number of hearing cases distributed

* Create priority only distributions

* Code Climate

* Updates

* Updates
 to priority push

* fix specs

* Rename ama distribution

* Add Yoom's changes

* Fix merge

* Code climate fixes

* Return the number of appeals acutally distributed and fix cc

* Code climate

* Feedback!

* Fix tests

* feedback

* Fix tests

* Test fix

* Fix tests

* [PART 2] Job to push priority cases to judges (#14715)

* WIP with tests on priority push job

* words words words

* Add test to track number converge over time

* Distribute all cases tied to judges

* Add test for pulling all judges with ready priority cases

* Fix merge

* Fix merge

* Tests and renaming

* only distribute tied appeals to eligible judges

* Create distributions for all judges, even if they have no cases tied to them

* Fix job

* Code climate

* Update scope name

* Feedbackgit stgit stgit st!

* Better comments

* Fix test

* [PART 3] Add report on priority push job completion (#14761)

* Add reporting

* Update reporting format to be a string

* Remove redundant VACOLS::CaseDocket::

* Pretty colors

* Add priority target and previous distributions to the report

* Ordering and formatting

* Fix report

* Fix report

* Fix tests

* Code climate

* [Part 4] Create non default distribution seed data (#14802)

* Better handling of dispatch judge and atty

* Fix factories to be able to be called after resetting sequences

* Add distribution seed file

* Add a case that will error

* Integrate bug fix

* Fix job

* Fixes

* Fix report

* Fix tied to judge trait

* User random keys

* Integrate changes from fix

* Fix tests

* Code climate

* Feedback

* Update priority_distributions.rb

* Fix merge

* cc

* Tests

* cc

* Fix tests

* Nocov on seeds that do not run by default
  • Loading branch information
hschallhorn committed Sep 15, 2020
1 parent 7ee8dd8 commit e6279de
Show file tree
Hide file tree
Showing 23 changed files with 2,189 additions and 385 deletions.
1 change: 1 addition & 0 deletions app/controllers/api/v1/jobs_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Api::V1::JobsController < Api::ApplicationController
"nightly_syncs" => NightlySyncsJob,
"out_of_service_reminder" => OutOfServiceReminderJob,
"prepare_establish_claim" => PrepareEstablishClaimTasksJob,
"push_priority_appeals_to_judges" => PushPriorityAppealsToJudgesJob,
"reassign_old_tasks" => ReassignOldTasksJob,
"retrieve_documents_for_reader" => RetrieveDocumentsForReaderJob,
"set_appeal_age_aod" => SetAppealAgeAodJob,
Expand Down
150 changes: 150 additions & 0 deletions app/jobs/push_priority_appeals_to_judges_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# frozen_string_literal: true

# Job that pushes priority cases to a judge rather than waiting for them to request cases. This will distribute cases
# to all judges whose teams that have `accepts_priority_pushed_cases` enabled. The first step distributes all priority
# cases tied to a judge without limit. The second step distributes remaining general population cases (cases not tied to
# an active judge) while attempting to even out the number of priority cases all judges have received over one month.
class PushPriorityAppealsToJudgesJob < CaseflowJob
queue_with_priority :low_priority
application_attr :queue

include AutomaticCaseDistribution

def perform
@tied_distributions = distribute_non_genpop_priority_appeals
@genpop_distributions = distribute_genpop_priority_appeals
send_job_report
end

def send_job_report
slack_service.send_notification(slack_report.join("\n"), self.class.name, "#appeals-job-alerts")
datadog_report_runtime(metric_group_name: "priority_appeal_push_job")
end

def slack_report
report = []
report << "*Number of cases tied to judges distributed*: " \
"#{@tied_distributions.map { |distribution| distribution.statistics['batch_size'] }.sum}"
report << "*Number of general population cases distributed*: " \
"#{@genpop_distributions.map { |distribution| distribution.statistics['batch_size'] }.sum}"

appeals_not_distributed = docket_coordinator.dockets.map do |docket_type, docket|
report << "*Age of oldest #{docket_type} case*: #{docket.oldest_priority_appeal_days_waiting} days"
[docket_type, docket.ready_priority_appeal_ids]
end.to_h

report << "*Number of appeals _not_ distributed*: #{appeals_not_distributed.values.flatten.count}"

report << ""
report << "*Debugging information*"
report << "Priority Target: #{priority_target}"
report << "Previous monthly distributions: #{priority_distributions_this_month_for_eligible_judges}"

if appeals_not_distributed.values.flatten.any?
add_stuck_appeals_to_report(report, appeals_not_distributed)
end

report
end

def add_stuck_appeals_to_report(report, appeals)
report.unshift("[WARN]")
report << "Legacy appeals not distributed: `LegacyAppeal.where(vacols_id: #{appeals[:legacy]})`"
report << "AMA appeals not distributed: `Appeal.where(uuid: #{appeals.values.drop(1).flatten})`"
report << COPY::PRIORITY_PUSH_WARNING_MESSAGE
end

# Distribute all priority cases tied to a judge without limit
def distribute_non_genpop_priority_appeals
eligible_judges.map do |judge|
Distribution.create!(judge: User.find(judge.id), priority_push: true).tap(&:distribute!)
end
end

# Distribute remaining general population cases while attempting to even out the number of priority cases all judges
# have received over one month
def distribute_genpop_priority_appeals
eligible_judge_target_distributions_with_leftovers.map do |judge_id, target|
Distribution.create!(
judge: User.find(judge_id),
priority_push: true
).tap { |distribution| distribution.distribute!(target) }
end
end

# Give any leftover cases to judges with the lowest distribution targets. Remove judges with 0 cases to be distributed
# as these are the final counts to distribute remaining ready priority cases
def eligible_judge_target_distributions_with_leftovers
leftover_cases = leftover_cases_count
target_distributions_for_eligible_judges.sort_by(&:last).map do |judge_id, target|
if leftover_cases > 0
leftover_cases -= 1
target += 1
end
(target > 0) ? [judge_id, target] : nil
end.compact.to_h
end

# Because we cannot distribute fractional cases, there can be cases leftover after taking the priority target
# into account. This number will always be less than the number of judges that need distribution because division
def leftover_cases_count
ready_priority_appeals_count - target_distributions_for_eligible_judges.values.sum
end

# Calculate the number of cases a judge should receive based on the priority target. Don't toss out judges with 0 as
# they could receive some of the leftover cases (if any)
def target_distributions_for_eligible_judges
priority_distributions_this_month_for_eligible_judges.map do |judge_id, distributions_this_month|
target = priority_target - distributions_this_month
(target >= 0) ? [judge_id, target] : nil
end.compact.to_h
end

# Calculates a target that will distribute all ready appeals so the remaining counts for each judge will produce
# even case counts over a full month (or as close as we can get to it)
def priority_target
@priority_target ||= begin
distribution_counts = priority_distributions_this_month_for_eligible_judges.values
target = (distribution_counts.sum + ready_priority_appeals_count) / distribution_counts.count

# If there are any judges that have previous distributions that are MORE than the currently calculated priority
# target, no target will be large enough to get all other judges up to their number of cases. Remove them from
# consideration and recalculate the target for all other judges.
while distribution_counts.any? { |distribution_count| distribution_count > target }
distribution_counts = distribution_counts.reject { |distribution_count| distribution_count > target }
target = (distribution_counts.sum + ready_priority_appeals_count) / distribution_counts.count
end

target
end
end

def docket_coordinator
@docket_coordinator ||= DocketCoordinator.new
end

def ready_priority_appeals_count
@ready_priority_appeals_count ||= docket_coordinator.priority_count
end

# Number of priority distributions every eligible judge has received in the last month
def priority_distributions_this_month_for_eligible_judges
eligible_judges.map { |judge| [judge.id, priority_distributions_this_month_for_all_judges[judge.id] || 0] }.to_h
end

def eligible_judges
@eligible_judges ||= JudgeTeam.pushed_priority_cases_allowed.map(&:judge)
end

# Produces a hash of judge_id and the number of cases distributed to them in the last month
def priority_distributions_this_month_for_all_judges
@priority_distributions_this_month_for_all_judges ||= priority_distributions_this_month
.pluck(:judge_id, :statistics)
.group_by(&:first)
.map { |judge_id, arr| [judge_id, arr.flat_map(&:last).map { |stats| stats["batch_size"] }.sum] }.to_h
end

def priority_distributions_this_month
Distribution.priority_pushed.completed.where(completed_at: 30.days.ago..Time.zone.now)
end
end
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# frozen_string_literal: true

module AmaCaseDistribution
module AutomaticCaseDistribution
extend ActiveSupport::Concern

delegate :dockets,
Expand All @@ -17,7 +17,21 @@ def docket_coordinator
@docket_coordinator ||= DocketCoordinator.new
end

def ama_distribution
def priority_push_distribution(limit = nil)
@appeals = []
@rem = 0

if limit.nil?
# Distribute priority appeals that are tied to judges (not genpop) with no limit.
distribute_appeals(:legacy, nil, priority: true, genpop: "not_genpop")
distribute_appeals(:hearing, nil, priority: true, genpop: "not_genpop")
else
# Distribute <limit> number of cases, regardless of docket type, oldest first.
distribute_limited_priority_appeals_from_all_dockets(limit)
end
end

def requested_distribution
@appeals = []
@rem = batch_size
@remaining_docket_proportions = docket_proportions.clone
Expand All @@ -40,9 +54,7 @@ def ama_distribution

# If we haven't yet met the priority target, distribute additional priority appeals.
priority_rem = (priority_target - @appeals.count(&:priority)).clamp(0, @rem)
oldest_priority_appeals_by_docket(priority_rem).each do |docket, n|
distribute_appeals(docket, n, priority: true)
end
distribute_limited_priority_appeals_from_all_dockets(priority_rem)

# As we may have already distributed nonpriority legacy and hearing docket cases, we adjust the docket proportions.
deduct_distributed_actuals_from_remaining_docket_proportions(:legacy, :hearing)
Expand All @@ -51,15 +63,20 @@ def ama_distribution
# If a docket runs out of available appeals, we reallocate its cases to the other dockets.
until @rem == 0 || @remaining_docket_proportions.all_zero?
distribute_appeals_according_to_remaining_docket_proportions
@nonpriority_iterations += 1
end

@appeals
end

def distribute_limited_priority_appeals_from_all_dockets(limit)
num_oldest_priority_appeals_by_docket(limit).each do |docket, number_of_appeals_to_distribute|
distribute_appeals(docket, number_of_appeals_to_distribute, priority: true)
end
end

def ama_statistics
{
batch_size: batch_size,
batch_size: @appeals.count,
total_batch_size: total_batch_size,
priority_count: priority_count,
direct_review_due_count: direct_review_due_count,
Expand All @@ -74,14 +91,17 @@ def ama_statistics
}
end

def distribute_appeals(docket, num, priority: false, genpop: "any", range: nil, bust_backlog: false)
return [] unless num > 0
# Handles the distribution of appeals from any docket while tracking appeals distributed and the remaining number of
# appeals to distribute. A nil limit will distribute an infinate number of appeals, only to be used for non_genpop
# distributions (distributions tied to a judge)
def distribute_appeals(docket, limit = nil, priority: false, genpop: "any", range: nil, bust_backlog: false)
return [] unless limit.nil? || limit > 0

if range.nil? && !bust_backlog
appeals = dockets[docket].distribute_appeals(self, priority: priority, genpop: genpop, limit: num)
appeals = dockets[docket].distribute_appeals(self, priority: priority, genpop: genpop, limit: limit)
elsif docket == :legacy && priority == false
appeals = dockets[docket].distribute_nonpriority_appeals(
self, genpop: genpop, range: range, limit: num, bust_backlog: bust_backlog
self, genpop: genpop, range: range, limit: limit, bust_backlog: bust_backlog
)
else
fail "'range' and 'bust_backlog' are only valid arguments when distributing nonpriority, legacy appeals"
Expand All @@ -106,12 +126,13 @@ def deduct_distributed_actuals_from_remaining_docket_proportions(*dockets)
end

def distribute_appeals_according_to_remaining_docket_proportions
@nonpriority_iterations += 1
@remaining_docket_proportions
.normalize!
.stochastic_allocation(@rem)
.each do |docket, n|
appeals = distribute_appeals(docket, n, priority: false)
@remaining_docket_proportions[docket] = 0 if appeals.count < n
.each do |docket, number_of_appeals_to_distribute|
appeals = distribute_appeals(docket, number_of_appeals_to_distribute, priority: false)
@remaining_docket_proportions[docket] = 0 if appeals.count < number_of_appeals_to_distribute
end
end

Expand All @@ -128,13 +149,14 @@ def legacy_docket_range
(docket_margin_net_of_priority * docket_proportions[:legacy]).round
end

def oldest_priority_appeals_by_docket(num)
def num_oldest_priority_appeals_by_docket(num)
return {} unless num > 0

dockets
.flat_map { |sym, docket| docket.age_of_n_oldest_priority_appeals(num).map { |age| [age, sym] } }
.sort_by { |a| a[0] }
.sort_by { |age, _| age }
.first(num)
.each_with_object(Hash.new(0)) { |a, counts| counts[a[1]] += 1 }
.group_by { |_, sym| sym }
.transform_values(&:count)
end
end
8 changes: 5 additions & 3 deletions app/models/distribution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

class Distribution < CaseflowRecord
include ActiveModel::Serializers::JSON
include AmaCaseDistribution
include AutomaticCaseDistribution

has_many :distributed_cases
belongs_to :judge, class_name: "User"
Expand All @@ -20,13 +20,15 @@ class Distribution < CaseflowRecord
CASES_PER_ATTORNEY = 3
ALTERNATIVE_BATCH_SIZE = 15

scope :priority_pushed, -> { where(priority_push: true) }

class << self
def pending_for_judge(judge)
where(status: %w[pending started], judge: judge)
end
end

def distribute!
def distribute!(limit = nil)
return unless %w[pending error].include? status

if status == "error"
Expand All @@ -41,7 +43,7 @@ def distribute!
multi_transaction do
ActiveRecord::Base.connection.execute "SET LOCAL statement_timeout = #{transaction_time_out}"

ama_distribution
priority_push? ? priority_push_distribution(limit) : requested_distribution

update!(status: "completed", completed_at: Time.zone.now, statistics: ama_statistics)
end
Expand Down
11 changes: 11 additions & 0 deletions app/models/docket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ def age_of_n_oldest_priority_appeals(num)
appeals(priority: true, ready: true).limit(num).map(&:ready_for_distribution_at)
end

def oldest_priority_appeal_days_waiting
oldest_appeal_ready_at = age_of_n_oldest_priority_appeals(1).first
return 0 if oldest_appeal_ready_at.nil?

(Time.zone.now.to_date - oldest_appeal_ready_at.to_date).to_i
end

def ready_priority_appeal_ids
appeals(priority: true, ready: true).pluck(:uuid)
end

# rubocop:disable Lint/UnusedMethodArgument
def distribute_appeals(distribution, priority: false, genpop: nil, limit: 1)
Distribution.transaction do
Expand Down
10 changes: 10 additions & 0 deletions app/models/dockets/hearing_request_docket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,18 @@ def distribute_appeals(distribution, priority: false, genpop: "any", limit: 1)
base_relation: base_relation, genpop: genpop, judge: distribution.judge
).call

appeals = self.class.limit_genpop_appeals(appeals, limit) if genpop.eql? "any"

HearingRequestCaseDistributor.new(
appeals: appeals, genpop: genpop, distribution: distribution, priority: priority
).call
end

def self.limit_genpop_appeals(appeals_array, limit)
# genpop 'any' returns 2 arrays of the limited base relation. This means if we only request 2 cases, appeals is a
# 2x2 array containing 4 cases overall and we will end up distributing 4 cases rather than 2. Instead, reinstate the
# limit here by filtering out the newest cases
appeals_to_reject = appeals_array.flatten.sort_by(&:ready_for_distribution_at).drop(limit)
appeals_array.map { |appeals| appeals - appeals_to_reject }
end
end
11 changes: 11 additions & 0 deletions app/models/dockets/legacy_docket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ def weight
count(priority: false) + nod_count * NOD_ADJUSTMENT
end

def ready_priority_appeal_ids
VACOLS::CaseDocket.priority_ready_appeal_vacols_ids
end

def oldest_priority_appeal_days_waiting
oldest_appeal_ready_at = age_of_n_oldest_priority_appeals(1).first
return 0 if oldest_appeal_ready_at.nil?

(Time.zone.now.to_date - oldest_appeal_ready_at.to_date).to_i
end

def age_of_n_oldest_priority_appeals(num)
LegacyAppeal.repository.age_of_n_oldest_priority_appeals(num)
end
Expand Down
2 changes: 1 addition & 1 deletion app/models/organizations/judge_team.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# frozen_string_literal: true

class JudgeTeam < Organization
scope :pushed_priority_cases_allowed, -> { where(accepts_priority_pushed_cases: true) }
scope :pushed_priority_cases_allowed, -> { active.where(accepts_priority_pushed_cases: true) }

class << self
def for_judge(user)
Expand Down
Loading

0 comments on commit e6279de

Please sign in to comment.