Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions app/factories/event_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,41 @@ module EventFactory
class << self
# Creates and returns an event from the dequeued message from the events SQS queue.
def create_from_sqs(message)
now = Time.now.utc

Event.new(
uuid: message["uuid"] || SecureRandom.uuid,
subj_id: DoiUtilities.normalize_doi(message["subjId"]) || message["subjId"],
obj_id: DoiUtilities.normalize_doi(message["objId"]) || message["objId"],
source_id: message["sourceId"],
aasm_state: "waiting",
source_token: message["sourceToken"],
created_at: now,
updated_at: now,
total: message["total"] || 1,
occurred_at: message["occurred_at"] || now,
occurred_at: message["occurred_at"] || Time.now.utc,
message_action: "create",
relation_type_id: message["relation_type_id"] || "references",
subj: message["subj"].to_json,
obj: message["obj"].to_json,
license: message["license"] || "https://creativecommons.org/publicdomain/zero/1.0/",
)
end

def update_from_sqs(event, message)
event.uuid = message["uuid"] if message["uuid"].present?
event.source_id = message["sourceId"] if message["sourceId"].present?
event.source_token = message["sourceToken"] if message["sourceToken"].present?
event.total = message["total"] if message["total"].present?
event.occurred_at = message["occurred_at"] if message["occurred_at"].present?
event.relation_type_id = message["relation_type_id"] if message["relation_type_id"].present?
event.subj = message["subj"].to_json if message["subj"].present?
event.obj = message["obj"].to_json if message["obj"].present?
event.license = message["license"] if message["license"].present?

if message["subj_id"].present?
event.subj_id = DoiUtilities.normalize_doi(message["subjId"]) || message["subjId"]
end

if ["obj_id"].present?
event.obj_id = DoiUtilities.normalize_doi(message["objId"]) || message["objId"]
end
end
end
end
9 changes: 5 additions & 4 deletions app/models/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ class Event < ApplicationRecord
attribute :callback, :text
attribute :error_messages, :text
attribute :source_token, :text
attribute :created_at, :datetime
attribute :updated_at, :datetime
attribute :indexed_at, :datetime, default: -> { Time.zone.at(0) }
attribute :occurred_at, :datetime
attribute :message_action, :string, default: "create"
Expand All @@ -36,8 +34,6 @@ class Event < ApplicationRecord
validates :source_id, presence: true
validates :source_token, presence: true
validates :message_action, presence: true, length: { maximum: 191 }
validates :created_at, presence: true
validates :updated_at, presence: true
validates :indexed_at, presence: true

# Getters
Expand All @@ -50,7 +46,12 @@ def obj_hash
end

# Callback Hooks

# We run some special logic in order to set the source and target doi
# and their related relation type ids.
before_validation :set_source_and_target_doi!

# After the event is persisted successfully to the database, we index the event in OpenSearch.
after_commit -> { EventIndexJob.perform_later(self) }

# OpenSearch Mappings
Expand Down
4 changes: 3 additions & 1 deletion app/workers/event_import_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ def create_event(event_data, log_prefix, log_identifier)
def update_event(event, event_data, log_prefix, log_identifier)
Rails.logger.info("#{log_prefix} Update an existing event for #{log_identifier}")

if event.update(data)
EventFactory.update_from_sqs(event, event_data)

if event.save
Rails.logger.info("#{log_prefix} Event successfully updated for #{log_identifier}")
else
Rails.logger.error("#{log_prefix} Updating event failed for #{log_identifier}: #{event.errors.inspect}")
Expand Down