Skip to content

Commit

Permalink
extract AiRubricMetrics class
Browse files Browse the repository at this point in the history
  • Loading branch information
davidsbailey committed May 9, 2024
1 parent a2173da commit c1a53a3
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 88 deletions.
69 changes: 69 additions & 0 deletions dashboard/app/jobs/ai_rubric_metrics.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
class AiRubricMetrics
# The CloudWatch metric namespace
AI_RUBRIC_METRICS_NAMESPACE = 'AiRubric'.freeze

# The firehose study name
AI_RUBRICS_FIREHOSE_STUDY = 'ai-rubrics'.freeze

# Write out metrics reflected in the response to CloudWatch
#
# Currently, this keeps track of a curated set of metrics returned
# within the 'metadata.usage' field of the returned response from
# the AI proxy service.
#
# @param [Hash] response The parsed JSON response from the AI proxy.
def self.log_token_metrics(response)
# Record the metadata
# The aiproxy service will report the usage in the metadata via:
# { metadata: { agent: 'openai', usage: { total_tokens: 1234, prompt_tokens: 432, completion_tokens: 802 } } }
[:TotalTokens, :PromptTokens, :CompletionTokens].each do |name|
# Send a metric to the AIRubric namespace under categories for both the
# service used (openai, etc) and the current environment.
tokens = response.dig('metadata', 'usage', name.to_s.underscore)
next if tokens.nil?
agent = response.dig('metadata', 'agent') || 'unknown'
log_metric(metric_name: name, agent: agent, value: tokens)
end
end

def self.log_metric(metric_name:, agent: nil, value: 1)
Cdo::Metrics.push(
AI_RUBRIC_METRICS_NAMESPACE,
[
{
metric_name: metric_name,
value: value,
dimensions: [
{name: 'Environment', value: CDO.rack_env},
{name: 'Agent', value: agent},
],
unit: 'Count'
}
]
)
end

def self.log_to_firehose(job:, error:, event_name:, agent: nil)
options = job.arguments.first
script_level = ScriptLevel.find(options[:script_level_id])

FirehoseClient.instance.put_record(
:analysis,
{
study: AI_RUBRICS_FIREHOSE_STUDY,
study_group: 'v0',
event: event_name,
data_string: "#{error.class.name}: #{error.message}",
data_json: {
user_id: options[:user_id],
requester_id: options[:requester_id],
script_level_id: options[:script_level_id],
script_name: script_level.script.name,
lesson_number: script_level.lesson.relative_position,
level_name: script_level.level.name,
agent: agent
}.to_json
}
)
end
end
86 changes: 9 additions & 77 deletions dashboard/app/jobs/evaluate_rubric_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ class EvaluateRubricJob < ApplicationJob
ATTEMPTS_ON_SERVICE_UNAVAILABLE = 3
ATTEMPTS_ON_GATEWAY_TIMEOUT = 3

# The CloudWatch metric namespace
AI_RUBRIC_METRICS_NAMESPACE = 'AiRubric'.freeze

# The firehose study name
AI_RUBRICS_FIREHOSE_STUDY = 'ai-rubrics'.freeze

# This is raised if there is any raised error due to a rate limit, e.g. a 429
# received from the aiproxy service.
class TooManyRequestsError < StandardError
Expand Down Expand Up @@ -152,30 +146,30 @@ def initialize(response)

# Retry on any reported rate limit (429 status). With 3 attempts, 'exponentially_longer' waits 3s, then 18s.
retry_on TooManyRequestsError, wait: :exponentially_longer, attempts: ATTEMPTS_ON_RATE_LIMIT do |job, error|
log_metric(metric_name: :RateLimit)
log_to_firehose(job: job, error: error, event_name: 'rate-limit')
AiRubricMetrics.log_metric(metric_name: :RateLimit)
AiRubricMetrics.log_to_firehose(job: job, error: error, event_name: 'rate-limit')
end

# Retry just once on a timeout. It is likely to timeout again.
retry_on Net::ReadTimeout, Timeout::Error, wait: 10.seconds, attempts: ATTEMPTS_ON_TIMEOUT_ERROR do |job, error|
log_metric(metric_name: :TimeoutError)
log_to_firehose(job: job, error: error, event_name: 'timeout-error')
AiRubricMetrics.log_metric(metric_name: :TimeoutError)
AiRubricMetrics.log_to_firehose(job: job, error: error, event_name: 'timeout-error')
end

# Retry on a 503 Service Unavailable error, including those returned by aiproxy
# when openai returns 500.
retry_on ServiceUnavailableError, wait: :exponentially_longer, attempts: ATTEMPTS_ON_SERVICE_UNAVAILABLE do |job, error|
agent = error.message.downcase.include?('openai') ? 'openai' : 'none'
log_metric(metric_name: :ServiceUnavailable, agent: agent)
log_to_firehose(job: job, error: error, event_name: 'service-unavailable', agent: agent)
AiRubricMetrics.log_metric(metric_name: :ServiceUnavailable, agent: agent)
AiRubricMetrics.log_to_firehose(job: job, error: error, event_name: 'service-unavailable', agent: agent)
end

# Retry on a 504 Gateway Timeout error, including those returned by aiproxy
# when openai request times out.
retry_on GatewayTimeoutError, wait: :exponentially_longer, attempts: ATTEMPTS_ON_GATEWAY_TIMEOUT do |job, error|
agent = error.message.downcase.include?('openai') ? 'openai' : 'none'
log_metric(metric_name: :GatewayTimeout, agent: agent)
log_to_firehose(job: job, error: error, event_name: 'gateway-timeout', agent: agent)
AiRubricMetrics.log_metric(metric_name: :GatewayTimeout, agent: agent)
AiRubricMetrics.log_to_firehose(job: job, error: error, event_name: 'gateway-timeout', agent: agent)
end

def perform(user_id:, requester_id:, script_level_id:, rubric_ai_evaluation_id: nil)
Expand Down Expand Up @@ -205,7 +199,7 @@ def perform(user_id:, requester_id:, script_level_id:, rubric_ai_evaluation_id:
response = get_openai_evaluations(openai_params)

# Log tokens and usage information
EvaluateRubricJob.log_token_metrics(response)
AiRubricMetrics.log_token_metrics(response)

# Get and validate the response data
ai_evaluations = response['data']
Expand All @@ -219,27 +213,6 @@ def perform(user_id:, requester_id:, script_level_id:, rubric_ai_evaluation_id:
write_ai_evaluations(user, merged_evaluations, rubric, rubric_ai_evaluation, project_version)
end

# Write out metrics reflected in the response to CloudWatch
#
# Currently, this keeps track of a curated set of metrics returned
# within the 'metadata.usage' field of the returned response from
# the AI proxy service.
#
# @param [Hash] response The parsed JSON response from the AI proxy.
def self.log_token_metrics(response)
# Record the metadata
# The aiproxy service will report the usage in the metadata via:
# { metadata: { agent: 'openai', usage: { total_tokens: 1234, prompt_tokens: 432, completion_tokens: 802 } } }
[:TotalTokens, :PromptTokens, :CompletionTokens].each do |name|
# Send a metric to the AIRubric namespace under categories for both the
# service used (openai, etc) and the current environment.
tokens = response.dig('metadata', 'usage', name.to_s.underscore)
next if tokens.nil?
agent = response.dig('metadata', 'agent') || 'unknown'
log_metric(metric_name: name, agent: agent, value: tokens)
end
end

# Ensure that the RubricAiEvaluation exists as an argument to the job
private def pass_in_or_create_rubric_ai_evaluation(job)
# Get the first argument to perform() which is the hash of named arguments
Expand Down Expand Up @@ -272,47 +245,6 @@ def self.log_token_metrics(response)
rubric_ai_evaluation
end

def self.log_metric(metric_name:, agent: nil, value: 1)
Cdo::Metrics.push(
AI_RUBRIC_METRICS_NAMESPACE,
[
{
metric_name: metric_name,
value: value,
dimensions: [
{name: 'Environment', value: CDO.rack_env},
{name: 'Agent', value: agent},
],
unit: 'Count'
}
]
)
end

def self.log_to_firehose(job:, error:, event_name:, agent: nil)
options = job.arguments.first
script_level = ScriptLevel.find(options[:script_level_id])

FirehoseClient.instance.put_record(
:analysis,
{
study: AI_RUBRICS_FIREHOSE_STUDY,
study_group: 'v0',
event: event_name,
data_string: "#{error.class.name}: #{error.message}",
data_json: {
user_id: options[:user_id],
requester_id: options[:requester_id],
script_level_id: options[:script_level_id],
script_name: script_level.script.name,
lesson_number: script_level.lesson.relative_position,
level_name: script_level.level.name,
agent: agent
}.to_json
}
)
end

# get the channel id of the project which stores the user's code on this script level.
private def get_channel_id(user, script_level)
# get the user's storage id from the database
Expand Down
22 changes: 11 additions & 11 deletions dashboard/test/jobs/evaluate_rubric_job_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class EvaluateRubricJobTest < ActiveJob::TestCase

# ensure RateLimit metric is logged
Cdo::Metrics.expects(:push).with(
EvaluateRubricJob::AI_RUBRIC_METRICS_NAMESPACE,
AiRubricMetrics::AI_RUBRIC_METRICS_NAMESPACE,
all_of(
includes_metrics(RateLimit: 1),
includes_dimensions(:RateLimit, Environment: CDO.rack_env)
Expand All @@ -241,7 +241,7 @@ class EvaluateRubricJobTest < ActiveJob::TestCase

# ensure firehose event is logged
FirehoseClient.instance.expects(:put_record).with do |stream, data|
data[:study] == EvaluateRubricJob::AI_RUBRICS_FIREHOSE_STUDY &&
data[:study] == AiRubricMetrics::AI_RUBRICS_FIREHOSE_STUDY &&
data[:event] == 'rate-limit' &&
JSON.parse(data[:data_json])['agent'].nil? &&
stream == :analysis
Expand Down Expand Up @@ -282,7 +282,7 @@ class EvaluateRubricJobTest < ActiveJob::TestCase

# ensure TimeoutError metric is logged
Cdo::Metrics.expects(:push).with(
EvaluateRubricJob::AI_RUBRIC_METRICS_NAMESPACE,
AiRubricMetrics::AI_RUBRIC_METRICS_NAMESPACE,
all_of(
includes_metrics(TimeoutError: 1),
includes_dimensions(:TimeoutError, Environment: CDO.rack_env)
Expand All @@ -291,7 +291,7 @@ class EvaluateRubricJobTest < ActiveJob::TestCase

# ensure firehose event is logged
FirehoseClient.instance.expects(:put_record).with do |stream, data|
data[:study] == EvaluateRubricJob::AI_RUBRICS_FIREHOSE_STUDY &&
data[:study] == AiRubricMetrics::AI_RUBRICS_FIREHOSE_STUDY &&
data[:event] == 'timeout-error' &&
JSON.parse(data[:data_json])['agent'].nil? &&
stream == :analysis
Expand Down Expand Up @@ -335,7 +335,7 @@ class EvaluateRubricJobTest < ActiveJob::TestCase

# ensure ServiceUnavailable metric is logged
Cdo::Metrics.expects(:push).with(
EvaluateRubricJob::AI_RUBRIC_METRICS_NAMESPACE,
AiRubricMetrics::AI_RUBRIC_METRICS_NAMESPACE,
all_of(
includes_metrics(ServiceUnavailable: 1),
includes_dimensions(:ServiceUnavailable, Environment: CDO.rack_env, Agent: 'openai')
Expand All @@ -344,7 +344,7 @@ class EvaluateRubricJobTest < ActiveJob::TestCase

# ensure firehose event is logged
FirehoseClient.instance.expects(:put_record).with do |stream, data|
data[:study] == EvaluateRubricJob::AI_RUBRICS_FIREHOSE_STUDY &&
data[:study] == AiRubricMetrics::AI_RUBRICS_FIREHOSE_STUDY &&
data[:event] == 'service-unavailable' &&
JSON.parse(data[:data_json])['agent'] == 'openai' &&
stream == :analysis
Expand Down Expand Up @@ -388,7 +388,7 @@ class EvaluateRubricJobTest < ActiveJob::TestCase

# ensure GatewayTimeout metric is logged
Cdo::Metrics.expects(:push).with(
EvaluateRubricJob::AI_RUBRIC_METRICS_NAMESPACE,
AiRubricMetrics::AI_RUBRIC_METRICS_NAMESPACE,
all_of(
includes_metrics(GatewayTimeout: 1),
includes_dimensions(:GatewayTimeout, Environment: CDO.rack_env, Agent: 'openai')
Expand All @@ -397,7 +397,7 @@ class EvaluateRubricJobTest < ActiveJob::TestCase

# ensure firehose event is logged
FirehoseClient.instance.expects(:put_record).with do |stream, data|
data[:study] == EvaluateRubricJob::AI_RUBRICS_FIREHOSE_STUDY &&
data[:study] == AiRubricMetrics::AI_RUBRICS_FIREHOSE_STUDY &&
data[:event] == 'gateway-timeout' &&
JSON.parse(data[:data_json])['agent'] == 'openai' &&
stream == :analysis
Expand Down Expand Up @@ -467,23 +467,23 @@ class EvaluateRubricJobTest < ActiveJob::TestCase

# Expect metrics to be logged for the AI evaluation
Cdo::Metrics.expects(:push).with(
EvaluateRubricJob::AI_RUBRIC_METRICS_NAMESPACE,
AiRubricMetrics::AI_RUBRIC_METRICS_NAMESPACE,
all_of(
includes_metrics(TotalTokens: 123),
includes_dimensions(:TotalTokens, Environment: CDO.rack_env, Agent: 'openai')
)
)

Cdo::Metrics.expects(:push).with(
EvaluateRubricJob::AI_RUBRIC_METRICS_NAMESPACE,
AiRubricMetrics::AI_RUBRIC_METRICS_NAMESPACE,
all_of(
includes_metrics(CompletionTokens: 100),
includes_dimensions(:CompletionTokens, Environment: CDO.rack_env, Agent: 'openai')
)
)

Cdo::Metrics.expects(:push).with(
EvaluateRubricJob::AI_RUBRIC_METRICS_NAMESPACE,
AiRubricMetrics::AI_RUBRIC_METRICS_NAMESPACE,
all_of(
includes_metrics(PromptTokens: 23),
includes_dimensions(:PromptTokens, Environment: CDO.rack_env, Agent: 'openai')
Expand Down

0 comments on commit c1a53a3

Please sign in to comment.