-
Notifications
You must be signed in to change notification settings - Fork 82
/
message_action.py
145 lines (126 loc) · 5.57 KB
/
message_action.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# Standard Library
import json
import os
# Third Party
import boto3
# First Party
from smdebug.core.logger import get_logger
# action :
# {name:'sms' or 'email', 'endpoint':'phone or emailid'}
class MessageAction:
def __init__(self, rule_name, message_type, message_endpoint):
self._topic_name = "SMDebugRules"
self._logger = get_logger()
if message_type == "sms" or message_type == "email":
self._protocol = message_type
else:
self._protocol = None
self._logger.info(
f"Unsupported message type:{message_type} in MessageAction. Returning"
)
return
self._message_endpoint = message_endpoint
# Below 2 is to help in tests
self._last_send_mesg_response = None
self._last_subscription_response = None
env_region_name = os.getenv("AWS_REGION", "us-east-1")
self._sns_client = boto3.client("sns", region_name=env_region_name)
self._topic_arn = self._create_sns_topic_if_not_exists()
self._subscribe_mesgtype_endpoint()
self._logger.info(
f"Registering messageAction with protocol:{self._protocol} endpoint:{self._message_endpoint} and topic_arn:{self._topic_arn} region:{env_region_name}"
)
self._rule_name = rule_name
def _create_sns_topic_if_not_exists(self):
try:
topic = self._sns_client.create_topic(Name=self._topic_name)
self._logger.info(
f"topic_name: {self._topic_name} , creating topic returned response:{topic}"
)
if topic:
return topic["TopicArn"]
except Exception as e:
self._logger.info(
f"Caught exception while creating topic:{self._topic_name} exception is: \n {e}"
)
return None
def _check_subscriptions(self, topic_arn, protocol, endpoint):
try:
next_token = "random"
subs = self._sns_client.list_subscriptions()
sub_array = subs["Subscriptions"]
while next_token is not None:
for sub_dict in sub_array:
proto = sub_dict["Protocol"]
ep = sub_dict["Endpoint"]
topic = sub_dict["TopicArn"]
if proto == protocol and topic == topic_arn and ep == endpoint:
self._logger.info(f"Existing Subscription found: {sub_dict}")
return True
if "NextToken" in subs:
next_token = subs["NextToken"]
subs = self._sns_client.list_subscriptions(NextToken=next_token)
sub_array = subs["Subscriptions"]
continue
else:
next_token = None
except Exception as e:
self._logger.info(
f"Caught exception while list subscription topic:{self._topic_name} exception is: \n {e}"
)
return False
def _subscribe_mesgtype_endpoint(self):
response = None
try:
if self._topic_arn and self._protocol and self._message_endpoint:
filter_policy = {}
if self._protocol == "sms":
filter_policy["phone_num"] = [self._message_endpoint]
else:
filter_policy["email"] = [self._message_endpoint]
if not self._check_subscriptions(
self._topic_arn, self._protocol, self._message_endpoint
):
response = self._sns_client.subscribe(
TopicArn=self._topic_arn,
Protocol=self._protocol, # sms or email
Endpoint=self._message_endpoint, # phone number or email addresss
Attributes={"FilterPolicy": json.dumps(filter_policy)},
ReturnSubscriptionArn=False, # True means always return ARN
)
else:
response = f"Subscription exists for topic:{self._topic_arn}, protocol:{self._protocol}, endpoint:{self._message_endpoint}"
except Exception as e:
self._logger.info(
f"Caught exception while subscribing endpoint on topic:{self._topic_arn} exception is: \n {e}"
)
self._logger.info(f"response for sns subscribe is {response} ")
self._last_subscription_response = response
def _send_message(self, message):
response = None
message = f"SMDebugRule:{self._rule_name} fired. {message}"
try:
if self._protocol == "sms":
msg_attributes = {
"phone_num": {"DataType": "String", "StringValue": self._message_endpoint}
}
else:
msg_attributes = {
"email": {"DataType": "String", "StringValue": self._message_endpoint}
}
response = self._sns_client.publish(
TopicArn=self._topic_arn,
Message=message,
Subject=f"SMDebugRule:{self._rule_name} fired",
# MessageStructure='json',
MessageAttributes=msg_attributes,
)
except Exception as e:
self._logger.info(
f"Caught exception while publishing message on topic:{self._topic_arn} exception is: \n {e}"
)
self._logger.info(f"Response of send message:{response}")
self._last_send_mesg_response = response
return response
def invoke(self, message):
self._send_message(message)