Skip to content

Commit

Permalink
Merge pull request #1617 from frsann/cosmosdb-tasks
Browse files Browse the repository at this point in the history
Cosmosdb tasks
  • Loading branch information
joshmeek authored Oct 16, 2019
2 parents 70ef07a + 252c95b commit 3d32720
Show file tree
Hide file tree
Showing 9 changed files with 511 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ These changes are available in the [master branch](https://github.com/PrefectHQ/
### Task Library

- Add `return_all` kwarg to `ShellTask` for optionally returning all lines of stdout - [#1598](https://github.com/PrefectHQ/prefect/pull/1598)
- Add `CosmosDBCreateItem`, `CosmosDBReadItems`, `CosmosDBQueryItems` and for interacting with data stored on Azure Cosmos DB - [#1617](https://github.com/PrefectHQ/prefect/pull/1617)

### Fixes

Expand Down
18 changes: 18 additions & 0 deletions docs/core/task_library/azure.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,21 @@ Task for downloading data from an Blob Storage container and returning it as a s
Task for uploading string data (e.g., a JSON string) to an Blob Storage. Note that all initialization arguments can optionally be provided or overwritten at runtime.

[API Reference](/api/unreleased/tasks/azure.html#prefect-tasks-azure-blobstorage-blobstorageupload)

## CosmosDBCreateItem <Badge text="task"/>

Task for creating an item in a Cosmos database. Note that all initialization arguments can optionally be provided or overwritten at runtime.

[API Reference](/api/unreleased/tasks/azure.html#prefect-tasks-azure-cosmosdb-cosmosdbcreateitem)

## CosmosDBReadItems <Badge text="task"/>

Task for reading items from a Azure Cosmos database. Note that all initialization arguments can optionally be provided or overwritten at runtime.

[API Reference](/api/unreleased/tasks/azure.html#prefect-tasks-azure-cosmosdb-cosmosdbreaditems)

## CosmosDBQueryItems <Badge text="task"/>

Task for querying items from a Azure Cosmos database. Note that all initialization arguments can optionally be provided or overwritten at runtime.

[API Reference](/api/unreleased/tasks/azure.html#prefect-tasks-azure-cosmosdb-cosmosdbqueryitems)
6 changes: 5 additions & 1 deletion docs/outline.toml
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,11 @@ classes = ["S3Download", "S3Upload", "LambdaCreate", "LambdaDelete" , "LambdaInv
[pages.tasks.azure]
title = "Azure Tasks"
module = "prefect.tasks.azure"
classes = ["BlobStorageDownload", "BlobStorageUpload"]
classes = ["BlobStorageDownload",
"BlobStorageUpload",
"CosmosDBCreateItem",
"CosmosDBReadItems",
"CosmosDBQueryItems"]

[pages.tasks.azureml]
title = "Azure ML Service Tasks"
Expand Down
6 changes: 5 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
extras = {
"airtable": ["airtable-python-wrapper >= 0.11, < 0.12"],
"aws": ["boto3 >= 1.9, < 2.0"],
"azure": ["azure-storage-blob >= 2.1.0, < 3.0", "azureml-sdk >= 1.0.65, < 1.1"],
"azure": [
"azure-storage-blob >= 2.1.0, < 3.0",
"azureml-sdk >= 1.0.65, < 1.1",
"azure-cosmos >= 3.1.1, <3.2",
],
"dev": dev_requires,
"dropbox": ["dropbox ~= 9.0"],
"google": [
Expand Down
8 changes: 5 additions & 3 deletions src/prefect/tasks/azure/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""
This module contains a collection of tasks for interacting with Azure resources.
Note that all tasks require a Prefect Secret called `"AZ_CREDENTIALS"` that should be a JSON
document with two keys: `"ACCOUNT_NAME"` and either `"ACCOUNT_KEY"` or `"SAS_TOKEN"`.
"""

try:
from prefect.tasks.azure.blobstorage import BlobStorageDownload, BlobStorageUpload
from prefect.tasks.azure.cosmosdb import (
CosmosDBCreateItem,
CosmosDBReadItems,
CosmosDBQueryItems,
)
except ImportError:
raise ImportError(
'Using `prefect.tasks.azure` requires Prefect to be installed with the "azure" extra.'
Expand Down
314 changes: 314 additions & 0 deletions src/prefect/tasks/azure/cosmosdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
from typing import Dict, Any, Union, List

import azure.cosmos.cosmos_client
from azure.cosmos.query_iterable import QueryIterable


from prefect import Task
from prefect.client import Secret
from prefect.utilities.tasks import defaults_from_attrs


class CosmosDBCreateItem(Task):
"""
Task for creating an item in a Azure Cosmos database.
Note that all initialization arguments can optionally be provided or overwritten at runtime.
Args:
- url (str, optional): The url to the database.
- database_or_container_link (str, optional): link to the database or container.
- item (dict, optional): the item to create
- azure_credentials_secret (str, optional): the name of the Prefect Secret
that stores your Azure credentials; this Secret must be JSON string with the key
`AZ_COSMOS_AUTH`. The value should be dictionary containing `masterKey` or `resourceTokens`,
where the `masterKey` value is the default authorization key to use to
create the client, and `resourceTokens` value is the alternative
authorization key.
- options (dict, optional): options to be passed to the
`azure.cosmos.cosmos_client.CosmosClient.CreateItem` method.
- **kwargs (dict, optional): additional keyword arguments to pass to the
Task constructor
"""

def __init__(
self,
url: str = None,
database_or_container_link: str = None,
item: Dict = None,
azure_credentials_secret: str = "AZ_CREDENTIALS",
options: Dict[Any, Any] = None,
**kwargs
) -> None:
self.url = url
self.database_or_container_link = database_or_container_link
self.item = item
self.azure_credentials_secret = azure_credentials_secret
self.options = options
super().__init__(**kwargs)

@defaults_from_attrs(
"url",
"database_or_container_link",
"item",
"azure_credentials_secret",
"options",
)
def run(
self,
url: str = None,
database_or_container_link: str = None,
item: Dict = None,
azure_credentials_secret: str = "AZ_CREDENTIALS",
options: Dict[Any, Any] = None,
) -> Dict[Any, Any]:
"""
Task run method.
Args:
- url (str, optional): The url to the database.
- database_or_container_link (str, optional): link to the database or container.
- item (dict, optional): the item to create
- azure_credentials_secret (str, optional): the name of the Prefect Secret
that stores your Azure credentials; this Secret must be JSON string with the key
`AZ_COSMOS_AUTH`. The value should be dictionary containing `masterKey` or `resourceTokens`,
where the `masterKey` value is the default authorization key to use to
create the client, and `resourceTokens` value is the alternative
authorization key.
- options (dict, optional): options to be passed to the
`azure.cosmos.cosmos_client.CosmosClient.CreateItem` method.
Returns:
- (dict): the created item.
"""

if url is None:
raise ValueError("A url must be provided.")

if database_or_container_link is None:
raise ValueError("A database or container link must be provided.")

if item is None:
raise ValueError("An item must be provided.")

azure_credentials = Secret(azure_credentials_secret).get()
auth_dict = azure_credentials["AZ_COSMOS_AUTH"]

client = azure.cosmos.cosmos_client.CosmosClient(
url_connection=url, auth=auth_dict
)

return_item = client.CreateItem(
database_or_container_link, item, options=options
)

return return_item


class CosmosDBReadItems(Task):
"""
Task for reading items from a Azure Cosmos database.
Note that all initialization arguments can optionally be provided or overwritten at runtime.
Args:
- url (str, optional): The url to the database.
- document_or_container_link (str, optional): link to a document or container.
If a document link is provided, the document in question is returned, otherwise
all docuements are returned.
- azure_credentials_secret (str, optional): the name of the Prefect Secret
that stores your Azure credentials; this Secret must be JSON string with the key
`AZ_COSMOS_AUTH`. The value should be dictionary containing `masterKey` or `resourceTokens`,
where the `masterKey` value is the default authorization key to use to
create the client, and `resourceTokens` value is the alternative
authorization key.
- options (dict, optional): options to be passed to the
`azure.cosmos.cosmos_client.CosmosClient.ReadItem` or `ReadItems` method.
- **kwargs (dict, optional): additional keyword arguments to pass to the
Task constructor
"""

def __init__(
self,
url: str = None,
document_or_container_link: str = None,
azure_credentials_secret: str = "AZ_CREDENTIALS",
options: Dict[Any, Any] = None,
**kwargs
) -> None:
self.url = url
self.document_or_container_link = document_or_container_link
self.azure_credentials_secret = azure_credentials_secret
self.options = options

super().__init__(**kwargs)

@defaults_from_attrs(
"url", "document_or_container_link", "azure_credentials_secret", "options"
)
def run(
self,
url: str = None,
document_or_container_link: str = None,
azure_credentials_secret: str = "AZ_CREDENTIALS",
options: Dict[Any, Any] = None,
) -> Union[Dict[Any, Any], List[Dict[Any, Any]]]:
"""
Task run method.
Args:
- url (str, optional): The url to the database.
- document_or_container_link (str, optional): link to a document or container.
If a document link is provided, the document in question is returned, otherwise
all docuements are returned.
- azure_credentials_secret (str, optional): the name of the Prefect Secret
that stores your Azure credentials; this Secret must be JSON string with the key
`AZ_COSMOS_AUTH`. The value should be dictionary containing `masterKey` or `resourceTokens`,
where the `masterKey` value is the default authorization key to use to
create the client, and `resourceTokens` value is the alternative
authorization key.
- options (dict, optional): options to be passed to the
`azure.cosmos.cosmos_client.CosmosClient.ReadItem` or `ReadItems` method.
Returns:
- (dict or list)): a single document or all documents.
"""

if url is None:
raise ValueError("A url must be provided.")

if document_or_container_link is None:
raise ValueError("A document or container link must be provided.")

azure_credentials = Secret(azure_credentials_secret).get()
auth_dict = azure_credentials["AZ_COSMOS_AUTH"]

client = azure.cosmos.cosmos_client.CosmosClient(
url_connection=url, auth=auth_dict
)

if self._is_valid_document_link(document_or_container_link):
return_obj = client.ReadItem(document_or_container_link, options=options)
else:
return_obj = client.ReadItems(
document_or_container_link, feed_options=options
)
return_obj = list(return_obj)

return return_obj

@staticmethod
def _is_valid_document_link(link: str) -> bool:
trimmed_link = link.strip("/")
split_link = trimmed_link.split("/")

if (
len(split_link) == 6
and split_link[0] == "dbs"
and split_link[2] == "colls"
and split_link[4] == "docs"
):
return True
return False


class CosmosDBQueryItems(Task):
"""
Task for creating an item in a Azure Cosmos database.
Note that all initialization arguments can optionally be provided or overwritten at runtime.
Args:
- url (str, optional): The url to the database.
- database_or_container_link (str, optional): link to the database or container.
- query (dict, optional): the query to run
- azure_credentials_secret (str, optional): the name of the Prefect Secret
that stores your Azure credentials; this Secret must be JSON string with the key
`AZ_COSMOS_AUTH`. The value should be dictionary containing `masterKey` or `resourceTokens`,
where the `masterKey` value is the default authorization key to use to
create the client, and `resourceTokens` value is the alternative
authorization key.
- options (dict, optional): options to be passed to the
`azure.cosmos.cosmos_client.CosmosClient.QueryItems` method.
- partition_key (str, None): Partition key for the query.
- **kwargs (dict, optional): additional keyword arguments to pass to the
Task constructor
"""

def __init__(
self,
url: str = None,
database_or_container_link: str = None,
query: str = None,
azure_credentials_secret: str = "AZ_CREDENTIALS",
options: Dict[Any, Any] = None,
partition_key: str = None,
**kwargs
) -> None:
self.url = url
self.database_or_container_link = database_or_container_link
self.query = query
self.azure_credentials_secret = azure_credentials_secret
self.options = options
self.partition_key = partition_key
super().__init__(**kwargs)

@defaults_from_attrs(
"url",
"database_or_container_link",
"query",
"azure_credentials_secret",
"options",
"partition_key",
)
def run(
self,
url: str = None,
database_or_container_link: str = None,
query: str = None,
azure_credentials_secret: str = "AZ_CREDENTIALS",
options: Dict[Any, Any] = None,
partition_key: str = None,
) -> List:
"""
Task run method.
Args:
- url (str, optional): The url to the database.
- database_or_container_link (str, optional): link to the database or container.
- query (dict, optional): the query to run
- azure_credentials_secret (str, optional): the name of the Prefect Secret
that stores your Azure credentials; this Secret must be JSON string with the key
`AZ_COSMOS_AUTH`. The value should be dictionary containing `masterKey` or `resourceTokens`,
where the `masterKey` value is the default authorization key to use to
create the client, and `resourceTokens` value is the alternative
authorization key.
- options (dict, optional): options to be passed to the
`azure.cosmos.cosmos_client.CosmosClient.QueryItems` method.
- partition_key (str, None): Partition key for the query.
Returns:
- (list): a list containing the query results, one item per row.
"""

if url is None:
raise ValueError("A url must be provided.")

if database_or_container_link is None:
raise ValueError("A database or container link must be provided.")

if query is None:
raise ValueError("A query must be provided.")

azure_credentials = Secret(azure_credentials_secret).get()
auth_dict = azure_credentials["AZ_COSMOS_AUTH"]

client = azure.cosmos.cosmos_client.CosmosClient(
url_connection=url, auth=auth_dict
)

items = client.QueryItems(
database_or_container_link,
query,
options=options,
partition_key=partition_key,
)

return list(items)
1 change: 1 addition & 0 deletions tests/tasks/azure/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pytest

pytest.importorskip("azure.storage.blob")
pytest.importorskip("azure.cosmos")
Loading

0 comments on commit 3d32720

Please sign in to comment.