-
Notifications
You must be signed in to change notification settings - Fork 13.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding ElastiCache Hook for creating, describing and deleting replication groups #8701
Conversation
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs some unit tests adding too please.
(I have only given the code a cursory glance at this point)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ashb , I will work on the changes.
@ashb Made the changes as per your suggestion. Also added unit tests, but moto does not support mocking aws elasticache hence the tests won't run. try:
from moto import mock_elasticache
except ImportError:
mock_elasticache = None
@unittest.skipIf(mock_elasticache is None, 'mock_elasticache package not available') But this does not seem to work, we do get following error:
Let me know if there is some other way run the tests. |
""" | ||
Call ElastiCache API for creating a replication group | ||
:param config: Python dictionary to use as config for creating replication group | ||
:return: Response from ElastiCache create replication group API | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of your doc comments need a blank line between prose and params, else it won't render correctly
""" | |
Call ElastiCache API for creating a replication group | |
:param config: Python dictionary to use as config for creating replication group | |
:return: Response from ElastiCache create replication group API | |
""" | |
""" | |
Call ElastiCache API for creating a replication group | |
:param config: Python dictionary to use as config for creating replication group | |
:return: Response from ElastiCache create replication group API | |
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See individual comments
rebased with latest upstream master hence the force push |
@ashb made the changes. Test are failing due to moto missing support for mocking ElastiCache. Do I need to write custom mock for this ? Let me know how should we test this. |
@VijayantSoni Probably just using |
@ashb updated the tests using unittest.mock and those are working fine. I am not able to understand other checks which are failing as they don't seem to be related to changes done here, can you please help with those ? |
@ashb rebased with latest master, that has resolve the failing checks. Please take a look when you get a chance. Thanks ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure an operator like this belongs in Aiflow.
Yes, it could, but there are better tools for managing (creating/deleting) resources such as HashiCorp's Terraform, or AWS CloudFormation (which already has an operator in master).
Can you describe in more detail why you want this please?
Hi @ashb , I see your point and yes we can discuss about it. Consider following scenario:
The basic idea is to create resources when needed and terminate them once the work is done. This flow can be orchestrated within Airflow itself hence the PR. Regarding managing resources via Terraform or Cloudformation, I agree with that but is it an anti-pattern to have this kind of hook in airflow ? I do see following hooks which provision resources in AWS infra:
|
@ashb did you get a chance to take a look at this ? Thanks |
Gotcha, I guess this one makes sense. Happy with the feature now. (I'm not sure I have time to review this again before I go on paterntiy leave for six weeks!) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Your tests need some work still -- you aren't actually importing the hook you want to test in your new test file, so you haven't tested any of the code.
You probably want to do so something like:
def setUp(self):
self.hook = ElastiCacheHook()
setattr(self.hook, 'conn', Mock())`
and then set various properties/return values of self.hook.conn (similar to how you are doing on self.hook
) the goal being to test all the code in your Hook class, but not reach out to an actual AWS API.
I won't be able to re-review this again for 6 weeks, so someone you'll have to ask in slack for a re-review -- I'd suggest one of @kaxil, @mik-laj or @potiuk.
:param replication_group_id: ID of replication group to check for status | ||
:return: Current status of replication group | ||
""" | ||
return self.describe_replication_group(replication_group_id)[self.REPLICATION_GROUPS][0][self.STATUS] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return self.describe_replication_group(replication_group_id)[self.REPLICATION_GROUPS][0][self.STATUS] | |
return self.describe_replication_group(replication_group_id)["ReplicationGroups"][0]["Status"] |
|
||
# Constants for ElastiCache describe API response keys | ||
REPLICATION_GROUPS = 'ReplicationGroups' | ||
REPLICATION_GROUP = 'ReplicationGroup' | ||
STATUS = 'Status' | ||
|
||
# Constants for ElastiCache replication group status | ||
STATUS_CREATING = 'creating' | ||
STATUS_AVAILABLE = 'available' | ||
STATUS_DELETING = 'deleting' | ||
STATUS_CREATE_FAILED = 'create-failed' | ||
STATUS_MODIFYING = 'modifying' | ||
STATUS_SNAPSHOTTING = 'snapshotting' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Constants for ElastiCache describe API response keys | |
REPLICATION_GROUPS = 'ReplicationGroups' | |
REPLICATION_GROUP = 'ReplicationGroup' | |
STATUS = 'Status' | |
# Constants for ElastiCache replication group status | |
STATUS_CREATING = 'creating' | |
STATUS_AVAILABLE = 'available' | |
STATUS_DELETING = 'deleting' | |
STATUS_CREATE_FAILED = 'create-failed' | |
STATUS_MODIFYING = 'modifying' | |
STATUS_SNAPSHOTTING = 'snapshotting' |
These constants make it harder to understand the code and isn't the style we use in Airfow, please use the string literals directly.
:param replication_group_id: ID of replication group to check for availability | ||
:return: True if available else False | ||
""" | ||
return self.get_replication_group_status(replication_group_id) == self.STATUS_AVAILABLE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return self.get_replication_group_status(replication_group_id) == self.STATUS_AVAILABLE | |
return self.get_replication_group_status(replication_group_id) == "available" |
return status in ( | ||
self.STATUS_AVAILABLE, | ||
self.STATUS_CREATE_FAILED, | ||
self.STATUS_DELETING | ||
), status |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return status in ( | |
self.STATUS_AVAILABLE, | |
self.STATUS_CREATE_FAILED, | |
self.STATUS_DELETING | |
), status | |
return status in ( | |
"available", | |
"create-failed", | |
"deleting" | |
), status |
This one could be worth extracting to a class variable:
TERMINAL_STATES = frozenset({"available", "create-failed", "deleting"})
And then this becomes
return status in ( | |
self.STATUS_AVAILABLE, | |
self.STATUS_CREATE_FAILED, | |
self.STATUS_DELETING | |
), status | |
return status in self.TERMINAL_STATES, status |
max_retries=None | ||
): | ||
""" | ||
Check if replication is available or not by performing a describe over it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doc talks about replication -- but is it just health of elasticache? (Because this still applies when a single node EC is used, right?)
|
||
sleep_time = sleep_time * exponential_back_off_factor | ||
|
||
if status != self.STATUS_AVAILABLE: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if status != self.STATUS_AVAILABLE: | |
if status != "available": |
:param max_retries: Max tries for checking availability of replication group | ||
:param exponential_back_off_factor: Factor for deciding next sleep time | ||
:param initial_sleep_time: Initial sleep time in seconds | ||
:param replication_group_id: ID of replication group to check for availability |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add types to these please?
:param max_retries: Max tries for checking availability of replication group | |
:param exponential_back_off_factor: Factor for deciding next sleep time | |
:param initial_sleep_time: Initial sleep time in seconds | |
:param replication_group_id: ID of replication group to check for availability | |
:param max_retries: Max tries for checking availability of replication group | |
:type max_retries: int | |
:param exponential_back_off_factor: Factor for deciding next sleep time | |
:type exponential_back_off_factor: ... | |
:param initial_sleep_time: Initial sleep time in seconds | |
:type initial_sleep_time: | |
:param replication_group_id: ID of replication group to check for availability | |
:type replication_group_id: |
Same for all the other public methods too please.
response = self.delete_replication_group(replication_group_id=replication_group_id) | ||
|
||
except self.conn.exceptions.ReplicationGroupNotFoundFault: | ||
self.log.info("Replication group with ID : '%s' does not exist", replication_group_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.log.info("Replication group with ID : '%s' does not exist", replication_group_id) | |
self.log.info("Replication group with ID '%s' does not exist", replication_group_id) |
|
||
message = exp.response['Error']['Message'] | ||
|
||
self.log.info('Received message : %s', message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.log.info('Received message : %s', message) | |
self.log.error('Received error message from AWS API: %s', message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Or maybe at warning?)
def test_create_replication_group(self): | ||
""" | ||
Test creation of replication group | ||
""" | ||
self.hook.create_replication_group.return_value = { | ||
"ReplicationGroup": { | ||
"ReplicationGroupId": self.replication_group_id, | ||
"Status": "creating" | ||
} | ||
} | ||
response = self.hook.create_replication_group(config=self.replication_group_config) | ||
|
||
assert response["ReplicationGroup"]["ReplicationGroupId"] == self.replication_group_id | ||
assert response["ReplicationGroup"]["Status"] == "creating" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't testing real code here -- you are only calling a mock function but not actually calling the real code anywhere from the hook.
@VijayantSoni It looks like fllaky test. Can you do a rebase? We have recently made changes to improve the stability of integration tests. |
Yeah @VijayantSoni -> this particular problem (transient) is likely to be already solved, so if you rebase on top of the latest master it should succeed. |
Slightly related question here: do you all prefer a rebase on master over a
merge? Or does it not matter since there is effectively no difference since
all PRs are squashed and merged?
Thanks!
On Thu, Aug 6, 2020 at 08:32 Jarek Potiuk ***@***.***> wrote:
Yeah @VijayantSoni <https://github.com/VijayantSoni> -> this particular
problem (transient) is likely to be already solved, so if you rebase on top
of the latest master it should succeed.
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub
<#8701 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/ACSELCXT5ZGNG3TXJ4GGHDTR7KWILANCNFSM4MYUAL2A>
.
--
Cooper
|
https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#id8 A rebase is recommended as it allows us to better manage the changes. Much easier to review the history of the change if you are doing rebasse instead of merge. |
Codecov Report
@@ Coverage Diff @@
## master #8701 +/- ##
===========================================
- Coverage 89.42% 34.71% -54.71%
===========================================
Files 1037 1038 +1
Lines 49985 49771 -214
===========================================
- Hits 44699 17279 -27420
- Misses 5286 32492 +27206
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
…g a delete request, doc string formaating, remove redundant log, minor refactor
…her types as well within ElasatiCache Removed separate string constants for API response and replaced with hardcoded strings everywhere Fixed tests to mock hook.conn and not hook, updated tests accordingly changed semantics of max_retries for polling, now it means number of retries excluding first try max_retries = 2 will mean 2 retires after the initial try
Fix name of test file to correspond with hook file name
Some python cosmetic changes Replace occurences of elasticache-hook with elasticache-replication-group-hook Remove doc strings from tests Fix formatting in operators-and-hooks-ref.rst
…ing feature Moved doc string from __init__ method to class doc string
…rmating (only 1 place)
…ture Adding default behaviour of params to doc string
Hi @feluelle , can you please help with understanding the static checks ? response = self.hook.wait_for_availability(
replication_group_id=self.REPLICATION_GROUP_ID,
max_retries=1,
initial_sleep_time=1, # seconds
) to this: response = self.hook.wait_for_availability(
replication_group_id=self.REPLICATION_GROUP_ID, max_retries=1, initial_sleep_time=1, # seconds
) and now it is asking to do the reverse (for such all cases in code). I am confused 😕 |
It might have sth. to do with the change of black version recently. See #10818 So I think yes, you have to do the reverse. |
@feluelle resolved static check issues, please have a look when you get a chance. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM @VijayantSoni 👍
Sorry for the late reply.
@ashb do you have time to review once more? (You requested changes.) |
👀 |
Awesome work, congrats on your first merged pull request! |
Adding ElastiCache Hook for creating, describing and deleting replication groups.
This hook interacts with AWS ElastiCache and supports following operations:
Make sure to mark the boxes below before creating PR: [x]
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.