Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Amazon CloudWatch query runners #4372

Merged
merged 6 commits into from Nov 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Binary file added client/app/assets/images/db-logos/cloudwatch.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
104 changes: 104 additions & 0 deletions redash/query_runner/cloudwatch.py
@@ -0,0 +1,104 @@
import boto3
import yaml
import datetime

from redash.query_runner import BaseQueryRunner, register
from redash.utils import json_dumps


def parse_response(results):
columns = [
{"name": "id", "type": "string"},
{"name": "label", "type": "string"},
{"name": "timestamp", "type": "datetime"},
{"name": "value", "type": "float"},
]

rows = []

for metric in results:
for i, value in enumerate(metric["Values"]):
rows.append(
{
"id": metric["Id"],
"label": metric["Label"],
"timestamp": metric["Timestamps"][i],
"value": value,
}
)

return rows, columns


class CloudWatch(BaseQueryRunner):
should_annotate_query = False

@classmethod
def name(cls):
return "Amazon CloudWatch"

@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"region": {"type": "string", "title": "AWS Region"},
"aws_access_key": {"type": "string", "title": "AWS Access Key"},
"aws_secret_key": {"type": "string", "title": "AWS Secret Key"},
},
"required": ["region", "aws_access_key", "aws_secret_key"],
"order": ["region", "aws_access_key", "aws_secret_key"],
"secret": ["aws_secret_key"],
}

def __init__(self, configuration):
super(CloudWatch, self).__init__(configuration)
self.syntax = "yaml"

def test_connection(self):
self.get_schema()

def _get_client(self):
cloudwatch = boto3.client(
"cloudwatch",
region_name=self.configuration.get("region"),
aws_access_key_id=self.configuration.get("aws_access_key"),
aws_secret_access_key=self.configuration.get("aws_secret_key"),
)
return cloudwatch

def get_schema(self, get_stats=False):
client = self._get_client()

paginator = client.get_paginator("list_metrics")

metrics = {}
for page in paginator.paginate():
for metric in page["Metrics"]:
if metric["Namespace"] not in metrics:
metrics[metric["Namespace"]] = {
"name": metric["Namespace"],
"columns": [],
}

if metric["MetricName"] not in metrics[metric["Namespace"]]["columns"]:
metrics[metric["Namespace"]]["columns"].append(metric["MetricName"])

return list(metrics.values())

def run_query(self, query, user):
cloudwatch = self._get_client()

query = yaml.safe_load(query)

results = []
paginator = cloudwatch.get_paginator("get_metric_data")
for page in paginator.paginate(**query):
results += page["MetricDataResults"]

rows, columns = parse_response(results)

return json_dumps({"rows": rows, "columns": columns}), None


register(CloudWatch)
148 changes: 148 additions & 0 deletions redash/query_runner/cloudwatch_insights.py
@@ -0,0 +1,148 @@
import boto3
import yaml
import datetime
import time

from botocore.exceptions import ParamValidationError

from redash.query_runner import BaseQueryRunner, register
from redash.utils import json_dumps, parse_human_time

POLL_INTERVAL = 3
TIMEOUT = 180


def parse_response(response):
results = response["results"]
rows = []
field_orders = {}

for row in results:
record = {}
rows.append(record)

for order, col in enumerate(row):
if col["field"] == "@ptr":
continue
field = col["field"]
record[field] = col["value"]
field_orders[field] = max(field_orders.get(field, -1), order)

fields = sorted(field_orders, key=lambda f: field_orders[f])
cols = [
{
"name": f,
"type": "datetime" if f == "@timestamp" else "string",
"friendly_name": f,
}
for f in fields
]
return {
"columns": cols,
"rows": rows,
"metadata": {"data_scanned": response["statistics"]["bytesScanned"]},
}


def parse_query(query):
query = yaml.safe_load(query)

for timeKey in ["startTime", "endTime"]:
if isinstance(query.get(timeKey), str):
query[timeKey] = int(parse_human_time(query[timeKey]).timestamp())
if not query.get("endTime"):
query["endTime"] = int(datetime.datetime.now().timestamp())

return query


class CloudWatchInsights(BaseQueryRunner):
should_annotate_query = False

@classmethod
def name(cls):
return "Amazon CloudWatch Logs Insights"

@classmethod
def type(cls):
return "cloudwatch_insights"

@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"region": {"type": "string", "title": "AWS Region"},
"aws_access_key": {"type": "string", "title": "AWS Access Key"},
"aws_secret_key": {"type": "string", "title": "AWS Secret Key"},
},
"required": ["region", "aws_access_key", "aws_secret_key"],
"order": ["region", "aws_access_key", "aws_secret_key"],
"secret": ["aws_secret_key"],
}

def __init__(self, configuration):
super(CloudWatchInsights, self).__init__(configuration)
self.syntax = "yaml"

def test_connection(self):
self.get_schema()

def _get_client(self):
cloudwatch = boto3.client(
"logs",
region_name=self.configuration.get("region"),
aws_access_key_id=self.configuration.get("aws_access_key"),
aws_secret_access_key=self.configuration.get("aws_secret_key"),
)
return cloudwatch

def get_schema(self, get_stats=False):
client = self._get_client()

log_groups = []
paginator = client.get_paginator("describe_log_groups")

for page in paginator.paginate():
for group in page["logGroups"]:
group_name = group["logGroupName"]
fields = client.get_log_group_fields(logGroupName=group_name)
log_groups.append(
{
"name": group_name,
"columns": [
field["name"] for field in fields["logGroupFields"]
],
}
)

return log_groups

def run_query(self, query, user):
logs = self._get_client()

query = parse_query(query)
query_id = logs.start_query(**query)["queryId"]

elapsed = 0
while True:
result = logs.get_query_results(queryId=query_id)
if result["status"] == "Complete":
data = parse_response(result)
break
if result["status"] in ("Failed", "Timeout", "Unknown", "Cancelled"):
raise Exception(
"CloudWatch Insights Query Execution Status: {}".format(
result["status"]
)
)
elif elapsed > TIMEOUT:
raise Exception("Request exceeded timeout.")
else:
time.sleep(POLL_INTERVAL)
elapsed += POLL_INTERVAL

return json_dumps(data), None


register(CloudWatchInsights)
2 changes: 2 additions & 0 deletions redash/settings/__init__.py
Expand Up @@ -302,6 +302,8 @@ def email_server_is_configured():
'redash.query_runner.cass',
'redash.query_runner.dgraph',
'redash.query_runner.azure_kusto',
'redash.query_runner.cloudwatch',
'redash.query_runner.cloudwatch_insights',
]

enabled_query_runners = array_from_string(os.environ.get("REDASH_ENABLED_QUERY_RUNNERS", ",".join(default_query_runners)))
Expand Down