Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
0a83644
[skip ci] elasticsearch support init: structure and skeleton code
Aug 31, 2021
947119f
[skip ci] rename elasticsearch->opensearch
Sep 10, 2021
4534d7a
[skip ci] merge Assaf and Murali forks
Sep 14, 2021
4e8f4e3
[skip ci] fixed filter_path pandasticsearch issue
Sep 14, 2021
7a010cd
[skip ci] disable scan for now
Sep 14, 2021
4bfbfd6
Merge branch 'main' of https://github.com/awslabs/aws-data-wrangler i…
Sep 14, 2021
79e0a9a
[skip ci] path documentation
Sep 15, 2021
f07e698
[skip ci] add delete_index
Sep 15, 2021
7d7318b
[skip ci] add delete_index
Sep 15, 2021
6b90c93
[skip ci] add index_json
Sep 15, 2021
73db6f5
[skip ci] add index_csv local path
Sep 15, 2021
15d8aca
[skip ci] add is_scroll to search (scan)
Sep 17, 2021
e01b1a0
[skip ci] add search_by_sql
Sep 17, 2021
1e1fe37
[skip ci] opensearch test infra
Sep 28, 2021
d574341
[skip ci] index create/delete ignore exceptions
Sep 28, 2021
7bb6779
[skip ci] index_documents documents type
Sep 28, 2021
75a2701
[skip ci] removed pandasticsearch dependency
Sep 28, 2021
cea9abb
[skip ci] port typo
Sep 28, 2021
f6c7dd4
[skip ci] enforced_pandas_params
Sep 28, 2021
517a3a6
Merge branch 'main' of https://github.com/awslabs/aws-data-wrangler i…
Sep 28, 2021
030e21c
[skip ci] isort & black
Sep 28, 2021
950231d
Added OpenSearch tutorial
mureddy29 Sep 28, 2021
9829755
typing fixes
Sep 28, 2021
0120e31
[skip ci] isort
Sep 28, 2021
b4700f6
[skip ci] black opensearch
Sep 28, 2021
51b8110
[skip ci] opensearch validation
Sep 28, 2021
cdf7dc7
Merge branch 'main' of https://github.com/awslabs/aws-data-wrangler i…
Sep 28, 2021
39457fc
[skip ci] opensearch: poetry add requests-aws4auth and elasticsearch
Sep 29, 2021
7be5062
[skip ci] opensearch: add support for host with schema http/https
Sep 29, 2021
cb8656c
Update 031 - OpenSearch.ipynb
mureddy29 Sep 29, 2021
22b5e9b
[skip ci] opensearch: index_documents 429 error
Sep 30, 2021
c5092a2
[skip ci] opensearch: add jsonpath_ng library
Sep 30, 2021
8bd8985
Merge branch 'elasticsearch-support' of https://github.com/AssafMentz…
Sep 30, 2021
97a35bd
[skip ci] opensearch: renamed fgac user/password
Sep 30, 2021
a73d875
[skip ci] opensearch: add connection timeout
Sep 30, 2021
ed7a57c
opensearch: get_credentials_from_session
Oct 1, 2021
aaf8943
Merge branch 'main' of https://github.com/awslabs/aws-data-wrangler i…
Oct 1, 2021
545e163
[skip ci] opensearch: indexing progressbar
Oct 1, 2021
6042ae4
[skip ci] opensearch.index_documents.max_retries default 5
Oct 3, 2021
c53cd6f
opensearch: replace elasticsearch-py with opensearch-py low-level client
Oct 4, 2021
5c5d717
[skip ci] opensearch filter_path default value
Oct 5, 2021
152c407
[skip ci] opensearch tutorial
Oct 5, 2021
419a5ce
Merge branch 'main' into elasticsearch-support
jaidisido Oct 5, 2021
808cf09
Merge branch 'main' into elasticsearch-support
jaidisido Oct 5, 2021
53dff4b
Minor - Pylint
jaidisido Oct 5, 2021
c6e6d80
[skip ci] opensearch: pylint f-string and file open encoding
Oct 5, 2021
2307100
[skip ci] opensearch: pylint f-string
Oct 5, 2021
29f892c
opensearch: add to CONTRIBUTING.md
Oct 6, 2021
827c3bf
opensearch: update aws-cdk packages to have the same minimum version
Oct 6, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ or
``./deploy-base.sh``
``./deploy-databases.sh``

* [OPTIONAL] Deploy the Cloudformation template `opensearch.yaml` (if you need to test Amazon OpenSearch Service). This step could take about 15 minutes to deploy.

``./deploy-opensearch.sh``

* Go to the `EC2 -> SecurityGroups` console, open the `aws-data-wrangler-*` security group and configure to accept your IP from any TCP port.
- Alternatively run:

Expand Down Expand Up @@ -244,7 +248,7 @@ or

``pytest -n 8 tests/test_db.py``

* To run all data lake test functions for all python versions (Only if Amazon QuickSight is activated):
* To run all data lake test functions for all python versions (Only if Amazon QuickSight is activated and Amazon OpenSearch template is deployed):

``./test.sh``

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ FROM "sampleDB"."sampleTable" ORDER BY time DESC LIMIT 3
- [026 - Amazon Timestream](https://github.com/awslabs/aws-data-wrangler/blob/main/tutorials/026%20-%20Amazon%20Timestream.ipynb)
- [027 - Amazon Timestream 2](https://github.com/awslabs/aws-data-wrangler/blob/main/tutorials/027%20-%20Amazon%20Timestream%202.ipynb)
- [028 - Amazon DynamoDB](https://github.com/awslabs/aws-data-wrangler/blob/main/tutorials/028%20-%20DynamoDB.ipynb)
- [031 - OpenSearch](https://github.com/awslabs/aws-data-wrangler/blob/main/tutorials/031%20-%20OpenSearch.ipynb)
- [**API Reference**](https://aws-data-wrangler.readthedocs.io/en/2.11.0/api.html)
- [Amazon S3](https://aws-data-wrangler.readthedocs.io/en/2.11.0/api.html#amazon-s3)
- [AWS Glue Catalog](https://aws-data-wrangler.readthedocs.io/en/2.11.0/api.html#aws-glue-catalog)
Expand Down
2 changes: 2 additions & 0 deletions awswrangler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
emr,
exceptions,
mysql,
opensearch,
postgresql,
quicksight,
redshift,
Expand All @@ -38,6 +39,7 @@
"data_api",
"dynamodb",
"exceptions",
"opensearch",
"quicksight",
"s3",
"sts",
Expand Down
17 changes: 17 additions & 0 deletions awswrangler/opensearch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""Utilities Module for Amazon OpenSearch."""

from awswrangler.opensearch._read import search, search_by_sql
from awswrangler.opensearch._utils import connect
from awswrangler.opensearch._write import create_index, delete_index, index_csv, index_df, index_documents, index_json

__all__ = [
"connect",
"create_index",
"delete_index",
"index_csv",
"index_documents",
"index_df",
"index_json",
"search",
"search_by_sql",
]
169 changes: 169 additions & 0 deletions awswrangler/opensearch/_read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""Amazon OpenSearch Read Module (PRIVATE)."""

from typing import Any, Collection, Dict, List, Mapping, Optional, Union

import pandas as pd
from opensearchpy import OpenSearch
from opensearchpy.helpers import scan

from awswrangler.opensearch._utils import _get_distribution


def _resolve_fields(row: Mapping[str, Any]) -> Mapping[str, Any]:
fields = {}
for field in row:
if isinstance(row[field], dict):
nested_fields = _resolve_fields(row[field])
for n_field, val in nested_fields.items():
fields[f"{field}.{n_field}"] = val
else:
fields[field] = row[field]
return fields


def _hit_to_row(hit: Mapping[str, Any]) -> Mapping[str, Any]:
row: Dict[str, Any] = {}
for k in hit.keys():
if k == "_source":
solved_fields = _resolve_fields(hit["_source"])
row.update(solved_fields)
elif k.startswith("_"):
row[k] = hit[k]
return row


def _search_response_to_documents(response: Mapping[str, Any]) -> List[Mapping[str, Any]]:
return [_hit_to_row(hit) for hit in response["hits"]["hits"]]


def _search_response_to_df(response: Union[Mapping[str, Any], Any]) -> pd.DataFrame:
return pd.DataFrame(_search_response_to_documents(response))


def search(
client: OpenSearch,
index: Optional[str] = "_all",
search_body: Optional[Dict[str, Any]] = None,
doc_type: Optional[str] = None,
is_scroll: Optional[bool] = False,
filter_path: Optional[Union[str, Collection[str]]] = None,
**kwargs: Any,
) -> pd.DataFrame:
"""Return results matching query DSL as pandas dataframe.

Parameters
----------
client : OpenSearch
instance of opensearchpy.OpenSearch to use.
index : str, optional
A comma-separated list of index names to search.
use `_all` or empty string to perform the operation on all indices.
search_body : Dict[str, Any], optional
The search definition using the [Query DSL](https://opensearch.org/docs/opensearch/query-dsl/full-text/).
doc_type : str, optional
Name of the document type (for Elasticsearch versions 5.x and earlier).
is_scroll : bool, optional
Allows to retrieve a large numbers of results from a single search request using
[scroll](https://opensearch.org/docs/opensearch/rest-api/scroll/)
for example, for machine learning jobs.
Because scroll search contexts consume a lot of memory, we suggest you don’t use the scroll operation
for frequent user queries.
filter_path : Union[str, Collection[str]], optional
Use the filter_path parameter to reduce the size of the OpenSearch Service response \
(default: ['hits.hits._id','hits.hits._source'])
**kwargs :
KEYWORD arguments forwarded to [opensearchpy.OpenSearch.search]\
(https://opensearch-py.readthedocs.io/en/latest/api.html#opensearchpy.OpenSearch.search)
and also to [opensearchpy.helpers.scan](https://opensearch-py.readthedocs.io/en/master/helpers.html#scan)
if `is_scroll=True`

Returns
-------
Union[pandas.DataFrame, Iterator[pandas.DataFrame]]
Results as Pandas DataFrame

Examples
--------
Searching an index using query DSL

>>> import awswrangler as wr
>>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT')
>>> df = wr.opensearch.search(
... client=client,
... index='movies',
... search_body={
... "query": {
... "match": {
... "title": "wind"
... }
... }
... }
... )


"""
if doc_type:
kwargs["doc_type"] = doc_type

if filter_path is None:
filter_path = ["hits.hits._id", "hits.hits._source"]

if is_scroll:
if isinstance(filter_path, str):
filter_path = [filter_path]
filter_path = ["_scroll_id", "_shards"] + list(filter_path) # required for scroll
documents_generator = scan(client, index=index, query=search_body, filter_path=filter_path, **kwargs)
documents = [_hit_to_row(doc) for doc in documents_generator]
df = pd.DataFrame(documents)
else:
response = client.search(index=index, body=search_body, filter_path=filter_path, **kwargs)
df = _search_response_to_df(response)
return df


def search_by_sql(client: OpenSearch, sql_query: str, **kwargs: Any) -> pd.DataFrame:
"""Return results matching [SQL query](https://opensearch.org/docs/search-plugins/sql/index/) as pandas dataframe.

Parameters
----------
client : OpenSearch
instance of opensearchpy.OpenSearch to use.
sql_query : str
SQL query
**kwargs :
KEYWORD arguments forwarded to request url (e.g.: filter_path, etc.)

Returns
-------
Union[pandas.DataFrame, Iterator[pandas.DataFrame]]
Results as Pandas DataFrame

Examples
--------
Searching an index using SQL query

>>> import awswrangler as wr
>>> client = wr.opensearch.connect(host='DOMAIN-ENDPOINT')
>>> df = wr.opensearch.search_by_sql(
>>> client=client,
>>> sql_query='SELECT * FROM my-index LIMIT 50'
>>> )


"""
if _get_distribution(client) == "opensearch":
url = "/_plugins/_sql"
else:
url = "/_opendistro/_sql"

kwargs["format"] = "json"
body = {"query": sql_query}
for size_att in ["size", "fetch_size"]:
if size_att in kwargs:
body["fetch_size"] = kwargs[size_att]
del kwargs[size_att] # unrecognized parameter
response = client.transport.perform_request(
"POST", url, headers={"Content-Type": "application/json"}, body=body, params=kwargs
)
df = _search_response_to_df(response)
return df
108 changes: 108 additions & 0 deletions awswrangler/opensearch/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""Amazon OpenSearch Utils Module (PRIVATE)."""

import logging
import re
from typing import Any, Optional

import boto3
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth

from awswrangler import _utils, exceptions

_logger: logging.Logger = logging.getLogger(__name__)


def _get_distribution(client: OpenSearch) -> Any:
return client.info().get("version", {}).get("distribution", "elasticsearch")


def _get_version(client: OpenSearch) -> Any:
return client.info().get("version", {}).get("number")


def _get_version_major(client: OpenSearch) -> Any:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed for anything?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes in opensearch/_write.py create_index method to support the deprecation of ES mapping types

version = _get_version(client)
if version:
return int(version.split(".")[0])
return None


def _strip_endpoint(endpoint: str) -> str:
uri_schema = re.compile(r"https?://")
return uri_schema.sub("", endpoint).strip().strip("/")


def connect(
host: str,
port: Optional[int] = 443,
boto3_session: Optional[boto3.Session] = boto3.Session(),
region: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
) -> OpenSearch:
"""Create a secure connection to the specified Amazon OpenSearch domain.

Note
----
We use [opensearch-py](https://github.com/opensearch-project/opensearch-py), an OpenSearch low-level python client.

The username and password are mandatory if the OS Cluster uses [Fine Grained Access Control]\
(https://docs.aws.amazon.com/opensearch-service/latest/developerguide/fgac.html).
If fine grained access control is disabled, session access key and secret keys are used.

Parameters
----------
host : str
Amazon OpenSearch domain, for example: my-test-domain.us-east-1.es.amazonaws.com.
port : int
OpenSearch Service only accepts connections over port 80 (HTTP) or 443 (HTTPS)
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 Session will be used if boto3_session receive None.
region :
AWS region of the Amazon OS domain. If not provided will be extracted from boto3_session.
username :
Fine-grained access control username. Mandatory if OS Cluster uses Fine Grained Access Control.
password :
Fine-grained access control password. Mandatory if OS Cluster uses Fine Grained Access Control.

Returns
-------
opensearchpy.OpenSearch
OpenSearch low-level client.
https://github.com/opensearch-project/opensearch-py/blob/main/opensearchpy/client/__init__.py
"""
valid_ports = {80, 443}

if port not in valid_ports:
raise ValueError(f"results: port must be one of {valid_ports}")

if username and password:
http_auth = (username, password)
else:
if region is None:
region = _utils.get_region_from_session(boto3_session=boto3_session)
creds = _utils.get_credentials_from_session(boto3_session=boto3_session)
if creds.access_key is None or creds.secret_key is None:
raise exceptions.InvalidArgument(
"One of IAM Role or AWS ACCESS_KEY_ID and SECRET_ACCESS_KEY must be "
"given. Unable to find ACCESS_KEY_ID and SECRET_ACCESS_KEY in boto3 "
"session."
)
http_auth = AWS4Auth(creds.access_key, creds.secret_key, region, "es", session_token=creds.token)
try:
es = OpenSearch(
host=_strip_endpoint(host),
port=port,
http_auth=http_auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout=30,
max_retries=10,
retry_on_timeout=True,
)
except Exception as e:
_logger.error("Error connecting to Opensearch cluster. Please verify authentication details")
raise e
return es
Loading