/
rds_handler.py
124 lines (96 loc) · 4.44 KB
/
rds_handler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# -*- coding: utf-8 -*-
"""rds instances scheduler."""
from typing import Iterator
import boto3
from botocore.exceptions import ClientError
from .exceptions import rds_exception
class RdsScheduler(object):
"""Abstract rds scheduler in a class."""
def __init__(self, region_name=None) -> None:
"""Initialize rds scheduler."""
if region_name:
self.rds = boto3.client("rds", region_name=region_name)
else:
self.rds = boto3.client("rds")
def stop(self, tag_key: str, tag_value: str) -> None:
"""Aws rds cluster and instance stop function.
Stop rds aurora clusters and rds db instances with defined tag.
:param str tag_key:
Aws tag key to use for filter resources
:param str tag_value:
Aws tag value to use for filter resources
"""
for cluster_id in self.list_clusters(tag_key, tag_value):
try:
self.rds.stop_db_cluster(DBClusterIdentifier=cluster_id)
print("Stop rds cluster {0}".format(cluster_id))
except ClientError as exc:
rds_exception("rds cluster", cluster_id, exc)
for instance_id in self.list_instances(tag_key, tag_value):
try:
self.rds.stop_db_instance(DBInstanceIdentifier=instance_id)
print("Stop rds instance {0}".format(instance_id))
except ClientError as exc:
rds_exception("rds instance", instance_id, exc)
def start(self, tag_key: str, tag_value: str) -> None:
"""Aws rds cluster start function.
Start rds aurora clusters and db instances a with defined tag.
:param str tag_key:
Aws tag key to use for filter resources
:param str tag_value:
Aws tag value to use for filter resources
"""
for cluster_id in self.list_clusters(tag_key, tag_value):
try:
self.rds.start_db_cluster(DBClusterIdentifier=cluster_id)
print("Start rds cluster {0}".format(cluster_id))
except ClientError as exc:
rds_exception("rds cluster", cluster_id, exc)
for instance_id in self.list_instances(tag_key, tag_value):
try:
self.rds.start_db_instance(DBInstanceIdentifier=instance_id)
print("Start rds instance {0}".format(instance_id))
except ClientError as exc:
rds_exception("rds instance", instance_id, exc)
def list_clusters(self, tag_key: str, tag_value: str) -> Iterator[str]:
"""Aws rds cluster list function.
Return the list of all rds clusters
:param str tag_key:
Aws tag key to use for filter resources
:param str tag_value:
Aws tag value to use for filter resources
:yield Iterator[str]:
The list Id of filtered rds clusters
"""
paginator = self.rds.get_paginator("describe_db_clusters")
for page in paginator.paginate():
for custer in page["DBClusters"]:
response_cluster = self.rds.list_tags_for_resource(
ResourceName=custer["DBClusterArn"]
)
taglist = response_cluster["TagList"]
# Retrieve rds cluster with specific tag
for tag in taglist:
if tag["Key"] == tag_key and tag["Value"] == tag_value:
yield custer["DBClusterIdentifier"]
def list_instances(self, tag_key: str, tag_value: str) -> Iterator[str]:
"""Aws rds instance list function.
Return the list of all rds instances
:param str tag_key:
Aws tag key to use for filter resources
:param str tag_value:
Aws tag value to use for filter resources
:yield Iterator[str]:
The list Id of filtered rds instances
"""
paginator = self.rds.get_paginator("describe_db_instances")
for page in paginator.paginate():
for instance in page["DBInstances"]:
reponse_instance = self.rds.list_tags_for_resource(
ResourceName=instance["DBInstanceArn"]
)
taglist = reponse_instance["TagList"]
# Retrieve rds instance with specific tag
for tag in taglist:
if tag["Key"] == tag_key and tag["Value"] == tag_value:
yield instance["DBInstanceIdentifier"]