Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cloudwatchlogs module and bumping version to 0.0.1! 馃殌 #18

Merged
merged 1 commit into from Sep 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 55 additions & 17 deletions README.md
@@ -1,4 +1,4 @@
# AWS Data Wrangler (beta)
# AWS Data Wrangler

> Utility belt to handle data on AWS.

Expand All @@ -14,16 +14,28 @@

## Use Cases

* Pandas -> Parquet (S3)
* Pandas -> CSV (S3)
### Pandas
* Pandas -> Parquet (S3) (Parallel :rocket:)
* Pandas -> CSV (S3) (Parallel :rocket:)
* Pandas -> Glue Catalog
* Pandas -> Athena
* Pandas -> Redshift
* Pandas -> Athena (Parallel :rocket:)
* Pandas -> Redshift (Parallel :rocket:)
* CSV (S3) -> Pandas (One shot or Batching)
* Athena -> Pandas (One shot or Batching)
* PySpark -> Redshift
* Delete S3 objects (parallel :rocket:)
* Encrypt S3 data with KMS keys
* CloudWatch Logs Insights -> Pandas (NEW :star:)
* Encrypt Pandas Dataframes on S3 with KMS keys (NEW :star:)

### PySpark
* PySpark -> Redshift (Parallel :rocket:) (NEW :star:)

### General
* List S3 objects (Parallel :rocket:)
* Delete S3 objects (Parallel :rocket:)
* Delete listed S3 objects (Parallel :rocket:)
* Delete NOT listed S3 objects (Parallel :rocket:)
* Copy listed S3 objects (Parallel :rocket:)
* Get the size of S3 objects (Parallel :rocket:)
* Get CloudWatch Logs Insights query results (NEW :star:)

## Installation

Expand All @@ -37,7 +49,9 @@ Runs anywhere (AWS Lambda, AWS Glue, EMR, EC2, on-premises, local, etc).

## Examples

### Writing Pandas Dataframe to S3 + Glue Catalog
### Pandas

#### Writing Pandas Dataframe to S3 + Glue Catalog

```py3
session = awswrangler.Session()
Expand All @@ -51,7 +65,7 @@ session.pandas.to_parquet(

If a Glue Database name is passed, all the metadata will be created in the Glue Catalog. If not, only the s3 data write will be done.

### Writing Pandas Dataframe to S3 as Parquet encrypting with a KMS key
#### Writing Pandas Dataframe to S3 as Parquet encrypting with a KMS key

```py3
extra_args = {
Expand All @@ -64,7 +78,7 @@ session.pandas.to_parquet(
)
```

### Reading from AWS Athena to Pandas
#### Reading from AWS Athena to Pandas

```py3
session = awswrangler.Session()
Expand All @@ -74,7 +88,7 @@ dataframe = session.pandas.read_sql_athena(
)
```

### Reading from AWS Athena to Pandas in chunks (For memory restrictions)
#### Reading from AWS Athena to Pandas in chunks (For memory restrictions)

```py3
session = awswrangler.Session()
Expand All @@ -87,14 +101,14 @@ for dataframe in dataframe_iter:
print(dataframe) # Do whatever you want
```

### Reading from S3 (CSV) to Pandas
#### Reading from S3 (CSV) to Pandas

```py3
session = awswrangler.Session()
dataframe = session.pandas.read_csv(path="s3://...")
```

### Reading from S3 (CSV) to Pandas in chunks (For memory restrictions)
#### Reading from S3 (CSV) to Pandas in chunks (For memory restrictions)

```py3
session = awswrangler.Session()
Expand All @@ -106,7 +120,17 @@ for dataframe in dataframe_iter:
print(dataframe) # Do whatever you want
```

### Typical Pandas ETL
#### Reading from CloudWatch Logs Insights to Pandas

```py3
session = awswrangler.Session()
dataframe = session.pandas.read_log_query(
log_group_names=[LOG_GROUP_NAME],
query="fields @timestamp, @message | sort @timestamp desc | limit 5",
)
```

#### Typical Pandas ETL

```py3
import pandas
Expand All @@ -125,7 +149,9 @@ session.pandas.to_parquet( # Storing the data and metadata to Data Lake
)
```

### Loading Pyspark Dataframe to Redshift
### PySpark

#### Loading PySpark Dataframe to Redshift

```py3
session = awswrangler.Session(spark_session=spark)
Expand All @@ -140,13 +166,25 @@ session.spark.to_redshift(
)
```

### Deleting a bunch of S3 objects
### General

#### Deleting a bunch of S3 objects (parallel :rocket:)

```py3
session = awswrangler.Session()
session.s3.delete_objects(path="s3://...")
```

#### Get CloudWatch Logs Insights query results

```py3
session = awswrangler.Session()
results = session.cloudwatchlogs.query(
log_group_names=[LOG_GROUP_NAME],
query="fields @timestamp, @message | sort @timestamp desc | limit 5",
)
```

## Diving Deep

### Pandas to Redshift Flow
Expand Down
1 change: 1 addition & 0 deletions awswrangler/__init__.py
Expand Up @@ -6,6 +6,7 @@
from awswrangler.pandas import Pandas # noqa
from awswrangler.s3 import S3 # noqa
from awswrangler.athena import Athena # noqa
from awswrangler.cloudwatchlogs import CloudWatchLogs # noqa
from awswrangler.glue import Glue # noqa
from awswrangler.redshift import Redshift # noqa
import awswrangler.utils # noqa
Expand Down
2 changes: 1 addition & 1 deletion awswrangler/__version__.py
@@ -1,4 +1,4 @@
__title__ = "awswrangler"
__description__ = "Utility belt to handle data on AWS."
__version__ = "0.0b32"
__version__ = "0.0.1"
__license__ = "Apache License 2.0"
23 changes: 20 additions & 3 deletions awswrangler/athena.py
@@ -1,7 +1,7 @@
from time import sleep
import logging

from awswrangler.exceptions import UnsupportedType
from awswrangler.exceptions import UnsupportedType, QueryFailed, QueryCancelled

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -86,12 +86,29 @@ def run_query(self, query, database, s3_output=None):
return response["QueryExecutionId"]

def wait_query(self, query_execution_id):
"""
Wait query ends

:param query_execution_id: Query execution ID
:return: Query response
"""
final_states = ["FAILED", "SUCCEEDED", "CANCELLED"]
response = self._client_athena.get_query_execution(
QueryExecutionId=query_execution_id)
while (response.get("QueryExecution").get("Status").get("State") not in
final_states):
state = response["QueryExecution"]["Status"]["State"]
while state not in final_states:
sleep(QUERY_WAIT_POLLING_DELAY)
response = self._client_athena.get_query_execution(
QueryExecutionId=query_execution_id)
state = response["QueryExecution"]["Status"]["State"]
logger.debug(f"state: {state}")
logger.debug(
f"StateChangeReason: {response['QueryExecution']['Status'].get('StateChangeReason')}"
)
if state == "FAILED":
raise QueryFailed(
response["QueryExecution"]["Status"].get("StateChangeReason"))
elif state == "CANCELLED":
raise QueryCancelled(
response["QueryExecution"]["Status"].get("StateChangeReason"))
return response
94 changes: 94 additions & 0 deletions awswrangler/cloudwatchlogs.py
@@ -0,0 +1,94 @@
from time import sleep
from datetime import datetime
import logging

from awswrangler.exceptions import QueryFailed, QueryCancelled

logger = logging.getLogger(__name__)

QUERY_WAIT_POLLING_DELAY = 0.2 # MILLISECONDS


class CloudWatchLogs:
def __init__(self, session):
self._session = session
self._client_logs = session.boto3_session.client(
service_name="logs", config=session.botocore_config)

def start_query(self,
query,
log_group_names,
start_time=datetime(year=1970, month=1, day=1),
end_time=datetime.utcnow(),
limit=None):
"""
Run a query against AWS CloudWatchLogs Insights and wait the results
https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html

:param query: The query string to use.
:param log_group_names: The list of log groups to be queried. You can include up to 20 log groups.
:param start_time: The beginning of the time range to query (datetime.datetime object)
:param end_time: The end of the time range to query (datetime.datetime object)
:param limit: The maximum number of log events to return in the query.
:return: Query ID
"""
start_timestamp = int(1000 * start_time.timestamp())
end_timestamp = int(1000 * end_time.timestamp())
logger.debug(f"start_timestamp: {start_timestamp}")
logger.debug(f"end_timestamp: {end_timestamp}")
args = {
"logGroupNames": log_group_names,
"startTime": start_timestamp,
"endTime": end_timestamp,
"queryString": query
}
if limit:
args["limit"] = limit
response = self._client_logs.start_query(**args)
return response["queryId"]

def wait_query(self, query_id):
"""
Wait query ends

:param query_id: Query ID
:return: Query results
"""
final_states = ["Complete", "Failed", "Cancelled"]
response = self._client_logs.get_query_results(queryId=query_id)
status = response["status"]
while status not in final_states:
sleep(QUERY_WAIT_POLLING_DELAY)
response = self._client_logs.get_query_results(queryId=query_id)
status = response["status"]
logger.debug(f"status: {status}")
if status == "Failed":
raise QueryFailed(f"query ID: {query_id}")
elif status == "Cancelled":
raise QueryCancelled(f"query ID: {query_id}")
return response

def query(self,
query,
log_group_names,
start_time=datetime(year=1970, month=1, day=1),
end_time=datetime.utcnow(),
limit=None):
"""
Run a query against AWS CloudWatchLogs Insights and wait the results
https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html

:param query: The query string to use.
:param log_group_names: The list of log groups to be queried. You can include up to 20 log groups.
:param start_time: The beginning of the time range to query (datetime.datetime object)
:param end_time: The end of the time range to query (datetime.datetime object)
:param limit: The maximum number of log events to return in the query.
:return: Results
"""
query_id = self.start_query(query=query,
log_group_names=log_group_names,
start_time=start_time,
end_time=end_time,
limit=limit)
response = self.wait_query(query_id=query_id)
return response["results"]
8 changes: 8 additions & 0 deletions awswrangler/exceptions.py
Expand Up @@ -56,3 +56,11 @@ class InvalidRedshiftSortkey(Exception):

class EmptyDataframe(Exception):
pass


class QueryCancelled(Exception):
pass


class QueryFailed(Exception):
pass
35 changes: 35 additions & 0 deletions awswrangler/pandas.py
Expand Up @@ -4,6 +4,7 @@
from math import floor
import copy
import csv
from datetime import datetime

import pandas
import pyarrow
Expand Down Expand Up @@ -833,3 +834,37 @@ def to_redshift(
mode=mode,
)
self._session.s3.delete_objects(path=path)

def read_log_query(self,
query,
log_group_names,
start_time=datetime(year=1970, month=1, day=1),
end_time=datetime.utcnow(),
limit=None):
"""
Run a query against AWS CloudWatchLogs Insights and convert the results to Pandas DataFrame

:param query: The query string to use. https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html
:param log_group_names: The list of log groups to be queried. You can include up to 20 log groups.
:param start_time: The beginning of the time range to query (datetime.datetime object)
:param end_time: The end of the time range to query (datetime.datetime object)
:param limit: The maximum number of log events to return in the query. If the query string uses the fields command, only the specified fields and their values are returned.
:return: Results as a Pandas DataFrame
"""
results = self._session.cloudwatchlogs.query(
query=query,
log_group_names=log_group_names,
start_time=start_time,
end_time=end_time,
limit=limit)
pre_df = []
for row in results:
new_row = {}
for col in row:
if col["field"].startswith("@"):
col_name = col["field"].replace("@", "", 1)
else:
col_name = col["field"]
new_row[col_name] = col["value"]
pre_df.append(new_row)
return pandas.DataFrame(pre_df)
8 changes: 8 additions & 0 deletions awswrangler/session.py
Expand Up @@ -7,6 +7,7 @@

from awswrangler.s3 import S3
from awswrangler.athena import Athena
from awswrangler.cloudwatchlogs import CloudWatchLogs
from awswrangler.pandas import Pandas
from awswrangler.glue import Glue
from awswrangler.redshift import Redshift
Expand Down Expand Up @@ -88,6 +89,7 @@ def __init__(
self._load_new_boto3_session()
self._s3 = None
self._athena = None
self._cloudwatchlogs = None
self._pandas = None
self._glue = None
self._redshift = None
Expand Down Expand Up @@ -202,6 +204,12 @@ def athena(self):
self._athena = Athena(session=self)
return self._athena

@property
def cloudwatchlogs(self):
if not self._cloudwatchlogs:
self._cloudwatchlogs = CloudWatchLogs(session=self)
return self._cloudwatchlogs

@property
def pandas(self):
if not self._pandas:
Expand Down
2 changes: 1 addition & 1 deletion building/Dockerfile
@@ -1,4 +1,4 @@
FROM lambci/lambda:build-python3.6
FROM lambci/lambda:build-python3.7

RUN pip install --upgrade pip

Expand Down