Skip to content

Commit

Permalink
Merge branch 'master' into andrew-newell-patch-1
Browse files Browse the repository at this point in the history
  • Loading branch information
scytherswings committed Jan 31, 2020
2 parents b1edd01 + 5e8a293 commit 70a71a4
Show file tree
Hide file tree
Showing 23 changed files with 520 additions and 44 deletions.
5 changes: 4 additions & 1 deletion .codeclimate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ version: "2"
checks:
argument-count:
config:
threshold: 5
threshold: 5

exclude_patterns:
- "examples/**/*"
6 changes: 5 additions & 1 deletion .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ Metrics/BlockLength:
# TODO: Evaluate if this is an acceptable parameter list. It's a constructor after all.
Metrics/ParameterLists:
Exclude:
- lib/pgdice/table.rb
- lib/pgdice/table.rb

AllCops:
Exclude:
- 'examples/**/*'
32 changes: 19 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,6 @@ PgDice is intended to be used by scheduled background jobs in frameworks like [S
where logging and clear exception messages are crucial.


## Disclaimer

There are some features in this gem which allow you to drop database tables.

If you choose to use this software without a __tested and working__ backup and restore strategy in place then you
are a fool and will pay the price for your negligence. THIS SOFTWARE IS PROVIDED "AS IS",
WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED. By using this software you agree that the creator,
maintainers and any affiliated parties CANNOT BE HELD LIABLE FOR DATA LOSS OR LOSSES OF ANY KIND.

See the [LICENSE](LICENSE) for more information.


# Installation

Add this line to your application's Gemfile:
Expand Down Expand Up @@ -279,6 +267,12 @@ PgDice.list_droppable_partitions('comments', past: 60)
```
This example would use `60` instead of the configured value of `90` from the `comments` table we configured above.

# Examples

1. [Here's an example on how to use PgDice in AWS](examples/aws) and the [README](examples/aws/README.md) which will guide
you through what is going on.

1. [Here's an example on how to write a config.yml for PgDice](examples/config.yml)

# FAQ

Expand All @@ -292,7 +286,7 @@ This example would use `60` instead of the configured value of `90` from the `co
password = config[Rails.env]["password"]

"postgres://#{username}:#{password}@#{host}/#{database}"
end
end
```

1. I'm seeing off-by-one errors for my `assert_tables` calls?
Expand All @@ -305,6 +299,8 @@ end
1. Non time-range based partitioning. [PgParty](https://github.com/rkrage/pg_party) might be a good option!




# Development

After checking out the repo, run `bin/setup` to install dependencies. Then, run `rake test` to run the tests.
Expand Down Expand Up @@ -336,6 +332,16 @@ to be a safe, welcoming space for collaboration, and contributors are expected t

The gem is available as open source under the terms of the [MIT License](https://opensource.org/licenses/MIT).

# Disclaimer

There are some features in this gem which allow you to drop database tables.

If you choose to use this software without a __tested and working__ backup and restore strategy in place then you
are a fool and will pay the price for your negligence. THIS SOFTWARE IS PROVIDED "AS IS",
WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED. By using this software you agree that the creator,
maintainers and any affiliated parties CANNOT BE HELD LIABLE FOR DATA LOSS OR LOSSES OF ANY KIND.

See the [LICENSE](LICENSE) for more information.

# Code of Conduct

Expand Down
28 changes: 28 additions & 0 deletions examples/aws/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# How can I use PgDice in production?

This collection of files is how I use PgDice in production. I'll describe the architecture here so you'll have a place
to start.

1. `tasks/poll_sqs.rake` is run using some sort of process manager like systemd on the ec2 instance. I like to run
the poll_sqs stuff on my Sidekiq instances because they are the ones who eventually handle the work anyway.

1. `lib/sqs_poller.rb` is used to handle the looping logic for the rake task. It invokes `lib/sqs_listener.rb` for each
iteration.

1. `lib/sqs_listener.rb` calls AWS SQS to receive messages and then passes each one into the `lib/sqs_listener/sqs_event_router.rb`
to be routed to the correct message handler.

1. Inside `lib/sqs_listener/sqs_event_router.rb` the message is parsed and passed through a case statement.
This could be abstracted better but for now if the message has a field of `event_type` and a value of `"task"` then
the router will send it off to the `TaskEventHandler` which in this case is
`lib/sqs_listener/typed_event_handler/task_event_handler.rb`

1. In the `TaskEventHandler` the task is sent to a handler which responds to the task specified in the message body field `task`.

1. The handler for the task (in this case, `DatabaseTasks`) handles the parameters for invoking the Sidekiq worker: `PgDiceWorker`

1. Finally, the `PgDiceWorker` is called and handles invoking `PgDice` based on the parameters passed in.


Hopefully that wasn't too confusing. There's a lot of steps in here because the system that uses PgDice handles lots
of different types of SQS events and needs to be as resilient as possible.
59 changes: 59 additions & 0 deletions examples/aws/cloudformation/scheduled_events.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{
"Description": "Deployment stack",
"Parameters": {
"PgDiceEnabled": {
"Type": "String",
"Description": "The ENABLED/DISABLED state of the cloudwatch scheduled events for PgDice."
}
},
"Resources": {
"PgDiceDailyAddPartitions": {
"DependsOn": "IncomingSQS",
"Type": "AWS::Events::Rule",
"Properties": {
"State":{
"Ref": "PgDiceEnabled"
},
"Description": " PgDice daily add partitions",
"Name": "PgDiceDailyAddPartitions",
"ScheduleExpression": "rate(1 day)",
"Targets": [
{
"Arn": {
"Fn::GetAtt": [
"IncomingSQS",
"Arn"
]
},
"Id": "PgDiceDailyAddPartitionsId",
"Input": "{\"event_type\":\"task\",\"task\":\"add_new_partitions\"}"
}
]
}
},
"PgDiceDailyDropPartitions": {
"DependsOn": "IncomingSQS",
"Type": "AWS::Events::Rule",
"Properties": {
"State":{
"Ref": "PgDiceEnabled"
},
"Description": " PgDice daily drop partitions",
"Name": "PgDiceDailyDropPartitions",
"ScheduleExpression": "rate(1 day)",
"Targets": [
{
"Arn": {
"Fn::GetAtt": [
"IncomingSQS",
"Arn"
]
},
"Id": "PgDiceDailyDropPartitionsId",
"Input": "{\"event_type\":\"task\",\"task\":\"drop_old_partitions\"}"
}
]
}
}
}
}
47 changes: 47 additions & 0 deletions examples/aws/lib/sqs_listener.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# frozen_string_literal: true

require 'aws-sdk-sqs'

# READ_ONLY_SQS can be set to ensure we don't delete good messages
class SqsListener
DEFAULT_VISIBILITY_TIMEOUT ||= 600
attr_reader :logger, :queue_url, :visibility_timeout

def initialize(opts = {})
@logger = opts[:logger] ||= Sidekiq.logger
@queue_url = opts[:queue_url] ||= ENV['SqsQueueUrl']
@sqs_client = opts[:sqs_client] ||= Aws::SQS::Client.new
@sqs_event_router = opts[:sqs_event_router] ||= SqsEventRouter.new(logger: logger)
increase_timeout_resolver = opts[:increase_timeout_resolver] ||= -> { ENV['READ_ONLY_SQS'].to_s == 'true' }
@visibility_timeout = calculate_visibility_timeout(increase_timeout_resolver.call)

logger.debug { "Running in environment: #{ENV['RAILS_ENV']} and using sqs queue: #{queue_url}" }
end

# http://docs.aws.amazon.com/sdk-for-ruby/v3/developer-guide/sqs-example-get-messages-with-long-polling.html
def call
# This uses long polling to retrieve sqs events so we can process them
response = @sqs_client.receive_message(queue_url: queue_url,
max_number_of_messages: 10,
wait_time_seconds: 20,
visibility_timeout: visibility_timeout)

if response.messages&.size&.positive?
logger.debug { "The number of messages received from the queue was: #{response.messages&.size}" }
end

# Iterate over all the messages in the response (Response is a Struct which acts like an object with methods)
response.messages&.each do |message|
@sqs_event_router.handle_message(message)
end
end

private

def calculate_visibility_timeout(increase_timeout)
visibility_timeout = increase_timeout ? DEFAULT_VISIBILITY_TIMEOUT * 4 : DEFAULT_VISIBILITY_TIMEOUT

logger.info { "Visibility timeout set to: #{visibility_timeout} seconds" }
visibility_timeout
end
end
32 changes: 32 additions & 0 deletions examples/aws/lib/sqs_listener/default_event_handler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# frozen_string_literal: true

class DefaultEventHandler
attr_reader :logger

def initialize(opts = {})
@logger = opts[:logger] ||= Sidekiq.logger
@fallthrough_event_handler = opts[:fallthrough_event_handler] ||= FallthroughEventHandler.new(logger: logger)
end

def handle_message(message)
# Since 'message' is a JSON formatted string, parse the JSON and then get the values under the 'Records' key
# When JSON parses a string it returns a Ruby Hash (just like a Java HashMap)
records = JSON.parse(message.body)['Records']
if records
process_records(records, message)
else
# If the message body doesn't have any entries under the 'Records' key then we don't know what to do.
@fallthrough_event_handler.call(message)
end
rescue StandardError => e
# If any errors are raised processing this message then call the fallthrough because something went wrong.
logger.error { "Caught error while handling incoming message. Calling fallthrough_event_handler. Error: #{e}" }
@fallthrough_event_handler.call(message)
end

private

def process_records(records, message)
# Process default event
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# frozen_string_literal: true

class UnknownTaskError < StandardError
end
18 changes: 18 additions & 0 deletions examples/aws/lib/sqs_listener/fallthrough_event_handler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# frozen_string_literal: true

class FallthroughEventHandler
attr_reader :logger

def initialize(opts = {})
@logger = opts[:logger] ||= Sidekiq.logger
@sqs_message_deleter = opts[:sqs_message_deleter] ||= SqsMessageDeleter.new(logger: logger)
end

def call(message)
logger.warn do
"Received sqs message we don't know how to process. Message: #{message}"
end

@sqs_message_deleter.call(message.receipt_handle)
end
end
32 changes: 32 additions & 0 deletions examples/aws/lib/sqs_listener/sqs_event_router.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# frozen_string_literal: true

# Responsible for routing incoming SQS events to the correct handler
class SqsEventRouter
attr_reader :logger

def initialize(opts = {})
@logger = opts[:logger] ||= Sidekiq.logger
@task_event_handler = opts[:task_event_handler] ||= TaskEventHandler.new(logger: logger)
@default_event_handler = opts[:default_event_handler] ||= DefaultEventHandler.new(logger: logger)
@sqs_message_deleter = opts[:sqs_message_deleter] ||= SqsMessageDeleter.new(logger: logger)
end

# Handles incoming sqs event, looking for a field of 'event_type'
# See scheduled_events.json for details on how to create task events from cloudwatch
def handle_message(message)
message_body = JSON.parse(message.body).with_indifferent_access
event_type = message_body[:event_type]

logger.tagged(message.receipt_handle) do
logger.debug { "The received message was: #{message}" }

case event_type
when 'task'
@task_event_handler.run_task(message_body)
@sqs_message_deleter.call(message.receipt_handle)
else
@default_event_handler.handle_message(message)
end
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# frozen_string_literal: true

class TaskEventHandler
attr_reader :logger

def initialize(opts = {})
@logger = opts[:logger] ||= Sidekiq.logger
@task_handlers = [opts[:task_handlers] ||= initialize_default_handlers].flatten.compact
end

def run_task(message_body_hash)
task = message_body_hash.fetch(:task).to_sym
logger.debug { "Running task: #{task}. Searching for task in: #{@task_handlers}" }

task_handlers = resolve_task_handlers(task)

if task_handlers.blank?
raise UnknownTaskError, "Could not find task: #{task} in any of the available task_handlers: #{@task_handlers}"
end

invoke_task_handler(task_handlers.first, task, message_body_hash.fetch(:parameters, {}))
end

private

def resolve_task_handlers(task)
task_handlers = @task_handlers.select { |task_handler| task_handler.respond_to?(task) }

task_handlers.each do |task_handler|
logger.debug { "Found task handler: #{task_handler.class} that can handle task: #{task}" }
end
task_handlers
end

def invoke_task_handler(task_handler, task, params)
logger.debug { "Invoking handler: #{task_handler.class}##{task} with params: #{params}" }
task_handler.public_send(task, params)
end

def initialize_default_handlers
[
DatabaseTasks.new
# Other tasks go here
]
end
end

0 comments on commit 70a71a4

Please sign in to comment.