Skip to content

Commit

Permalink
Merge pull request #18 from awslabs/cloudwatchlogs
Browse files Browse the repository at this point in the history
Add cloudwatchlogs module and bumping version to 0.0.1! 🚀
  • Loading branch information
igorborgest committed Sep 7, 2019
2 parents 6c1fe73 + 0a40bef commit 17635e7
Show file tree
Hide file tree
Showing 22 changed files with 623 additions and 51 deletions.
72 changes: 55 additions & 17 deletions README.md
@@ -1,4 +1,4 @@
# AWS Data Wrangler (beta)
# AWS Data Wrangler

> Utility belt to handle data on AWS.
Expand All @@ -14,16 +14,28 @@

## Use Cases

* Pandas -> Parquet (S3)
* Pandas -> CSV (S3)
### Pandas
* Pandas -> Parquet (S3) (Parallel :rocket:)
* Pandas -> CSV (S3) (Parallel :rocket:)
* Pandas -> Glue Catalog
* Pandas -> Athena
* Pandas -> Redshift
* Pandas -> Athena (Parallel :rocket:)
* Pandas -> Redshift (Parallel :rocket:)
* CSV (S3) -> Pandas (One shot or Batching)
* Athena -> Pandas (One shot or Batching)
* PySpark -> Redshift
* Delete S3 objects (parallel :rocket:)
* Encrypt S3 data with KMS keys
* CloudWatch Logs Insights -> Pandas (NEW :star:)
* Encrypt Pandas Dataframes on S3 with KMS keys (NEW :star:)

### PySpark
* PySpark -> Redshift (Parallel :rocket:) (NEW :star:)

### General
* List S3 objects (Parallel :rocket:)
* Delete S3 objects (Parallel :rocket:)
* Delete listed S3 objects (Parallel :rocket:)
* Delete NOT listed S3 objects (Parallel :rocket:)
* Copy listed S3 objects (Parallel :rocket:)
* Get the size of S3 objects (Parallel :rocket:)
* Get CloudWatch Logs Insights query results (NEW :star:)

## Installation

Expand All @@ -37,7 +49,9 @@ Runs anywhere (AWS Lambda, AWS Glue, EMR, EC2, on-premises, local, etc).

## Examples

### Writing Pandas Dataframe to S3 + Glue Catalog
### Pandas

#### Writing Pandas Dataframe to S3 + Glue Catalog

```py3
session = awswrangler.Session()
Expand All @@ -51,7 +65,7 @@ session.pandas.to_parquet(

If a Glue Database name is passed, all the metadata will be created in the Glue Catalog. If not, only the s3 data write will be done.

### Writing Pandas Dataframe to S3 as Parquet encrypting with a KMS key
#### Writing Pandas Dataframe to S3 as Parquet encrypting with a KMS key

```py3
extra_args = {
Expand All @@ -64,7 +78,7 @@ session.pandas.to_parquet(
)
```

### Reading from AWS Athena to Pandas
#### Reading from AWS Athena to Pandas

```py3
session = awswrangler.Session()
Expand All @@ -74,7 +88,7 @@ dataframe = session.pandas.read_sql_athena(
)
```

### Reading from AWS Athena to Pandas in chunks (For memory restrictions)
#### Reading from AWS Athena to Pandas in chunks (For memory restrictions)

```py3
session = awswrangler.Session()
Expand All @@ -87,14 +101,14 @@ for dataframe in dataframe_iter:
print(dataframe) # Do whatever you want
```

### Reading from S3 (CSV) to Pandas
#### Reading from S3 (CSV) to Pandas

```py3
session = awswrangler.Session()
dataframe = session.pandas.read_csv(path="s3://...")
```

### Reading from S3 (CSV) to Pandas in chunks (For memory restrictions)
#### Reading from S3 (CSV) to Pandas in chunks (For memory restrictions)

```py3
session = awswrangler.Session()
Expand All @@ -106,7 +120,17 @@ for dataframe in dataframe_iter:
print(dataframe) # Do whatever you want
```

### Typical Pandas ETL
#### Reading from CloudWatch Logs Insights to Pandas

```py3
session = awswrangler.Session()
dataframe = session.pandas.read_log_query(
log_group_names=[LOG_GROUP_NAME],
query="fields @timestamp, @message | sort @timestamp desc | limit 5",
)
```

#### Typical Pandas ETL

```py3
import pandas
Expand All @@ -125,7 +149,9 @@ session.pandas.to_parquet( # Storing the data and metadata to Data Lake
)
```

### Loading Pyspark Dataframe to Redshift
### PySpark

#### Loading PySpark Dataframe to Redshift

```py3
session = awswrangler.Session(spark_session=spark)
Expand All @@ -140,13 +166,25 @@ session.spark.to_redshift(
)
```

### Deleting a bunch of S3 objects
### General

#### Deleting a bunch of S3 objects (parallel :rocket:)

```py3
session = awswrangler.Session()
session.s3.delete_objects(path="s3://...")
```

#### Get CloudWatch Logs Insights query results

```py3
session = awswrangler.Session()
results = session.cloudwatchlogs.query(
log_group_names=[LOG_GROUP_NAME],
query="fields @timestamp, @message | sort @timestamp desc | limit 5",
)
```

## Diving Deep

### Pandas to Redshift Flow
Expand Down
1 change: 1 addition & 0 deletions awswrangler/__init__.py
Expand Up @@ -6,6 +6,7 @@
from awswrangler.pandas import Pandas # noqa
from awswrangler.s3 import S3 # noqa
from awswrangler.athena import Athena # noqa
from awswrangler.cloudwatchlogs import CloudWatchLogs # noqa
from awswrangler.glue import Glue # noqa
from awswrangler.redshift import Redshift # noqa
import awswrangler.utils # noqa
Expand Down
2 changes: 1 addition & 1 deletion awswrangler/__version__.py
@@ -1,4 +1,4 @@
__title__ = "awswrangler"
__description__ = "Utility belt to handle data on AWS."
__version__ = "0.0b32"
__version__ = "0.0.1"
__license__ = "Apache License 2.0"
23 changes: 20 additions & 3 deletions awswrangler/athena.py
@@ -1,7 +1,7 @@
from time import sleep
import logging

from awswrangler.exceptions import UnsupportedType
from awswrangler.exceptions import UnsupportedType, QueryFailed, QueryCancelled

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -86,12 +86,29 @@ def run_query(self, query, database, s3_output=None):
return response["QueryExecutionId"]

def wait_query(self, query_execution_id):
"""
Wait query ends
:param query_execution_id: Query execution ID
:return: Query response
"""
final_states = ["FAILED", "SUCCEEDED", "CANCELLED"]
response = self._client_athena.get_query_execution(
QueryExecutionId=query_execution_id)
while (response.get("QueryExecution").get("Status").get("State") not in
final_states):
state = response["QueryExecution"]["Status"]["State"]
while state not in final_states:
sleep(QUERY_WAIT_POLLING_DELAY)
response = self._client_athena.get_query_execution(
QueryExecutionId=query_execution_id)
state = response["QueryExecution"]["Status"]["State"]
logger.debug(f"state: {state}")
logger.debug(
f"StateChangeReason: {response['QueryExecution']['Status'].get('StateChangeReason')}"
)
if state == "FAILED":
raise QueryFailed(
response["QueryExecution"]["Status"].get("StateChangeReason"))
elif state == "CANCELLED":
raise QueryCancelled(
response["QueryExecution"]["Status"].get("StateChangeReason"))
return response
94 changes: 94 additions & 0 deletions awswrangler/cloudwatchlogs.py
@@ -0,0 +1,94 @@
from time import sleep
from datetime import datetime
import logging

from awswrangler.exceptions import QueryFailed, QueryCancelled

logger = logging.getLogger(__name__)

QUERY_WAIT_POLLING_DELAY = 0.2 # MILLISECONDS


class CloudWatchLogs:
def __init__(self, session):
self._session = session
self._client_logs = session.boto3_session.client(
service_name="logs", config=session.botocore_config)

def start_query(self,
query,
log_group_names,
start_time=datetime(year=1970, month=1, day=1),
end_time=datetime.utcnow(),
limit=None):
"""
Run a query against AWS CloudWatchLogs Insights and wait the results
https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html
:param query: The query string to use.
:param log_group_names: The list of log groups to be queried. You can include up to 20 log groups.
:param start_time: The beginning of the time range to query (datetime.datetime object)
:param end_time: The end of the time range to query (datetime.datetime object)
:param limit: The maximum number of log events to return in the query.
:return: Query ID
"""
start_timestamp = int(1000 * start_time.timestamp())
end_timestamp = int(1000 * end_time.timestamp())
logger.debug(f"start_timestamp: {start_timestamp}")
logger.debug(f"end_timestamp: {end_timestamp}")
args = {
"logGroupNames": log_group_names,
"startTime": start_timestamp,
"endTime": end_timestamp,
"queryString": query
}
if limit:
args["limit"] = limit
response = self._client_logs.start_query(**args)
return response["queryId"]

def wait_query(self, query_id):
"""
Wait query ends
:param query_id: Query ID
:return: Query results
"""
final_states = ["Complete", "Failed", "Cancelled"]
response = self._client_logs.get_query_results(queryId=query_id)
status = response["status"]
while status not in final_states:
sleep(QUERY_WAIT_POLLING_DELAY)
response = self._client_logs.get_query_results(queryId=query_id)
status = response["status"]
logger.debug(f"status: {status}")
if status == "Failed":
raise QueryFailed(f"query ID: {query_id}")
elif status == "Cancelled":
raise QueryCancelled(f"query ID: {query_id}")
return response

def query(self,
query,
log_group_names,
start_time=datetime(year=1970, month=1, day=1),
end_time=datetime.utcnow(),
limit=None):
"""
Run a query against AWS CloudWatchLogs Insights and wait the results
https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html
:param query: The query string to use.
:param log_group_names: The list of log groups to be queried. You can include up to 20 log groups.
:param start_time: The beginning of the time range to query (datetime.datetime object)
:param end_time: The end of the time range to query (datetime.datetime object)
:param limit: The maximum number of log events to return in the query.
:return: Results
"""
query_id = self.start_query(query=query,
log_group_names=log_group_names,
start_time=start_time,
end_time=end_time,
limit=limit)
response = self.wait_query(query_id=query_id)
return response["results"]
8 changes: 8 additions & 0 deletions awswrangler/exceptions.py
Expand Up @@ -56,3 +56,11 @@ class InvalidRedshiftSortkey(Exception):

class EmptyDataframe(Exception):
pass


class QueryCancelled(Exception):
pass


class QueryFailed(Exception):
pass
35 changes: 35 additions & 0 deletions awswrangler/pandas.py
Expand Up @@ -4,6 +4,7 @@
from math import floor
import copy
import csv
from datetime import datetime

import pandas
import pyarrow
Expand Down Expand Up @@ -833,3 +834,37 @@ def to_redshift(
mode=mode,
)
self._session.s3.delete_objects(path=path)

def read_log_query(self,
query,
log_group_names,
start_time=datetime(year=1970, month=1, day=1),
end_time=datetime.utcnow(),
limit=None):
"""
Run a query against AWS CloudWatchLogs Insights and convert the results to Pandas DataFrame
:param query: The query string to use. https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html
:param log_group_names: The list of log groups to be queried. You can include up to 20 log groups.
:param start_time: The beginning of the time range to query (datetime.datetime object)
:param end_time: The end of the time range to query (datetime.datetime object)
:param limit: The maximum number of log events to return in the query. If the query string uses the fields command, only the specified fields and their values are returned.
:return: Results as a Pandas DataFrame
"""
results = self._session.cloudwatchlogs.query(
query=query,
log_group_names=log_group_names,
start_time=start_time,
end_time=end_time,
limit=limit)
pre_df = []
for row in results:
new_row = {}
for col in row:
if col["field"].startswith("@"):
col_name = col["field"].replace("@", "", 1)
else:
col_name = col["field"]
new_row[col_name] = col["value"]
pre_df.append(new_row)
return pandas.DataFrame(pre_df)
8 changes: 8 additions & 0 deletions awswrangler/session.py
Expand Up @@ -7,6 +7,7 @@

from awswrangler.s3 import S3
from awswrangler.athena import Athena
from awswrangler.cloudwatchlogs import CloudWatchLogs
from awswrangler.pandas import Pandas
from awswrangler.glue import Glue
from awswrangler.redshift import Redshift
Expand Down Expand Up @@ -88,6 +89,7 @@ def __init__(
self._load_new_boto3_session()
self._s3 = None
self._athena = None
self._cloudwatchlogs = None
self._pandas = None
self._glue = None
self._redshift = None
Expand Down Expand Up @@ -202,6 +204,12 @@ def athena(self):
self._athena = Athena(session=self)
return self._athena

@property
def cloudwatchlogs(self):
if not self._cloudwatchlogs:
self._cloudwatchlogs = CloudWatchLogs(session=self)
return self._cloudwatchlogs

@property
def pandas(self):
if not self._pandas:
Expand Down
2 changes: 1 addition & 1 deletion building/Dockerfile
@@ -1,4 +1,4 @@
FROM lambci/lambda:build-python3.6
FROM lambci/lambda:build-python3.7

RUN pip install --upgrade pip

Expand Down

0 comments on commit 17635e7

Please sign in to comment.