Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for cryptography api #719

Merged
merged 10 commits into from
Jul 2, 2024
105 changes: 105 additions & 0 deletions dapr/aio/clients/grpc/_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-

"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import io
from typing import Union

from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
from dapr.clients.grpc._helpers import to_bytes
from dapr.clients.grpc._request import DaprRequest
from dapr.proto import api_v1, common_v1


class CryptoRequestIterator(DaprRequest):
"""An asynchronous iterator for cryptography API requests.

This reads data from a given stream by chunks and converts it to an asynchronous iterator
of cryptography API requests.

This iterator will be used for encrypt and decrypt gRPC bidirectional streaming requests.
"""

def __init__(
self,
data: Union[str, bytes],
options: Union[EncryptOptions, DecryptOptions],
request_type: Union[api_v1.EncryptRequest, api_v1.DecryptRequest],
):
"""Initialize CryptoRequestIterator with data and encryption/decryption options.

Args:
data (Union[str, bytes]): data to be encrypted or decrypted
options (Union[EncryptOptions, DecryptOptions]): encryption or decryption options
request_type (Union[api_v1.EncryptRequest, api_v1.DecryptRequest]): cryptography API request type
"""
self.data = io.BytesIO(to_bytes(data))
self.options = options.get_proto()
self.request_type = request_type
self.buffer_size = 2 << 10 # 2KiB
self.seq = 0

def __aiter__(self):
"""Returns the iterator object itself."""
return self

async def __anext__(self):
"""Read the next chunk of data from the input stream and create a gRPC stream request."""
# Read data from the input stream, in chunks of up to 2KB
# Send the data until we reach the end of the input stream
chunk = self.data.read(self.buffer_size)
if not chunk:
raise StopAsyncIteration

payload = common_v1.StreamPayload(data=chunk, seq=self.seq)
if self.seq == 0:
# If this is the first chunk, add the options
request_proto = self.request_type(payload=payload, options=self.options)
else:
request_proto = self.request_type(payload=payload)

self.seq += 1
return request_proto


class EncryptRequestIterator(CryptoRequestIterator):
"""An asynchronous iterator for encrypt API request.

This inherits from CryptoRequestIterator.
"""

def __init__(self, data: Union[str, bytes], options: EncryptOptions):
"""Initialize EncryptRequestIterator with data and options.

Args:
data (Union[str, bytes]): data to be encrypted
options (EncryptOptions): encryption options
"""
super().__init__(data, options, api_v1.EncryptRequest)


class DecryptRequestIterator(CryptoRequestIterator):
"""An asynchronous iterator for decrypt API requests.

This inherits from CryptoRequestIterator.
"""

def __init__(self, data: Union[str, bytes], options: DecryptOptions):
"""Initialize DecryptRequestIterator with data and options.

Args:
data (Union[str, bytes]): data to be decrypted
options (DecryptOptions): decryption options
"""
super().__init__(data, options, api_v1.DecryptRequest)
91 changes: 91 additions & 0 deletions dapr/aio/clients/grpc/_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-

"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from typing import AsyncGenerator, Generic

from dapr.proto import api_v1
from dapr.clients.grpc._response import DaprResponse, TCryptoResponse


class CryptoResponse(DaprResponse, Generic[TCryptoResponse]):
"""An asynchronous iterable of cryptography API responses."""

def __init__(self, stream: AsyncGenerator[TCryptoResponse, None]):
"""Initialize a CryptoResponse.

Args:
stream (AsyncGenerator[TCryptoResponse, None, None]): A stream of cryptography API responses.
"""
self._stream = stream
self._buffer = bytearray()
self._expected_seq = 0

async def __aiter__(self) -> AsyncGenerator[bytes, None]:
"""Read the next chunk of data from the stream.

Yields:
bytes: The payload data of the next chunk from the stream.

Raises:
ValueError: If the sequence number of the next chunk is incorrect.
"""
async for chunk in self._stream:
if chunk.payload.seq != self._expected_seq:
raise ValueError('invalid sequence number in chunk')
self._expected_seq += 1
yield chunk.payload.data

async def read(self, size: int = -1) -> bytes:
"""Read bytes from the stream.

If size is -1, the entire stream is read and returned as bytes.
Otherwise, up to `size` bytes are read from the stream and returned.
If the stream ends before `size` bytes are available, the remaining
bytes are returned.

Args:
size (int): The maximum number of bytes to read. If -1 (the default),
read until the end of the stream.

Returns:
bytes: The bytes read from the stream.
"""
if size == -1:
# Read the entire stream
return b''.join([chunk async for chunk in self])

# Read the requested number of bytes
data = bytes(self._buffer)
self._buffer.clear()

async for chunk in self:
data += chunk
if len(data) >= size:
break

# Update the buffer
remaining = data[size:]
self._buffer.extend(remaining)

# Return the requested number of bytes
return data[:size]


class EncryptResponse(CryptoResponse[api_v1.EncryptResponse]):
...


class DecryptResponse(CryptoResponse[api_v1.DecryptResponse]):
...
100 changes: 100 additions & 0 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
)

from dapr.clients.exceptions import DaprInternalError, DaprGrpcError
from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.clients.health import DaprHealth
Expand All @@ -56,6 +57,14 @@
validateNotNone,
validateNotBlankString,
)
from dapr.aio.clients.grpc._request import (
EncryptRequestIterator,
DecryptRequestIterator,
)
from dapr.aio.clients.grpc._response import (
EncryptResponse,
DecryptResponse,
)
from dapr.clients.grpc._request import (
InvokeMethodRequest,
BindingRequest,
Expand Down Expand Up @@ -1214,6 +1223,97 @@ async def unlock(self, store_name: str, resource_id: str, lock_owner: str) -> Un
status=UnlockResponseStatus(response.status), headers=await call.initial_metadata()
)

async def encrypt(self, data: Union[str, bytes], options: EncryptOptions):
"""Encrypt a stream data with given options.

The encrypt API encrypts a stream data with the given options.

Example:
from dapr.aio.clients import DaprClient
from dapr.clients.grpc._crypto import EncryptOptions

async with DaprClient() as d:
options = EncryptOptions(
component_name='crypto_component',
key_name='crypto_key',
key_wrap_algorithm='RSA',
)
resp = await d.encrypt(
data='hello dapr',
options=options,
)
encrypted_data = await resp.read()

Args:
data (Union[str, bytes]): Data to be encrypted.
options (EncryptOptions): Encryption options.

Returns:
Readable stream of `api_v1.EncryptResponse`.

Raises:
ValueError: If component_name, key_name, or key_wrap_algorithm is empty.
"""
# Warnings and input validation
warn(
'The Encrypt API is an Alpha version and is subject to change.',
UserWarning,
stacklevel=2,
)
validateNotBlankString(
component_name=options.component_name,
key_name=options.key_name,
key_wrap_algorithm=options.key_wrap_algorithm,
)

req_iterator = EncryptRequestIterator(data, options)
resp_stream = self._stub.EncryptAlpha1(req_iterator)
return EncryptResponse(resp_stream)

async def decrypt(self, data: Union[str, bytes], options: DecryptOptions):
"""Decrypt a stream data with given options.

The decrypt API decrypts a stream data with the given options.

Example:
from dapr.aio.clients import DaprClient
from dapr.clients.grpc._crypto import DecryptOptions

async with DaprClient() as d:
options = DecryptOptions(
component_name='crypto_component',
key_name='crypto_key',
)
resp = await d.decrypt(
data='hello dapr',
options=options,
)
decrypted_data = await resp.read()

Args:
data (Union[str, bytes]): Data to be decrypted.
options (DecryptOptions): Decryption options.

Returns:
Readable stream of `api_v1.DecryptResponse`.

Raises:
ValueError: If component_name is empty.
"""
# Warnings and input validation
warn(
'The Decrypt API is an Alpha version and is subject to change.',
UserWarning,
stacklevel=2,
)
validateNotBlankString(
component_name=options.component_name,
)

req_iterator = DecryptRequestIterator(data, options)
resp_stream = self._stub.DecryptAlpha1(req_iterator)
return DecryptResponse(resp_stream)

async def start_workflow(
self,
workflow_component: str,
Expand Down
71 changes: 71 additions & 0 deletions dapr/clients/grpc/_crypto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# -*- coding: utf-8 -*-

"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from dataclasses import dataclass

from dapr.proto import api_v1


@dataclass
class EncryptOptions:
"""EncryptOptions contains options passed to the encrypt method.

Args:
component_name (str): The name of the component.
key_name (str): The name of the key to use for the encryption operation.
key_wrap_algorithm (str): The key wrap algorithm to use.
data_encryption_cipher (str, optional): The cipher to use for the encryption operation.
omit_decryption_key_name (bool, optional): If True, omits the decryption key name from
header `dapr-decryption-key-name` from the output. If False, includes the specified
decryption key name specified in header `dapr-decryption-key-name`.
decryption_key_name (str, optional): If `dapr-omit-decryption-key-name` is True, this
contains the name of the intended decryption key to include in the output.
"""

component_name: str
key_name: str
key_wrap_algorithm: str
data_encryption_cipher: str = 'aes-gcm'
omit_decryption_key_name: bool = False
decryption_key_name: str = ''

def get_proto(self) -> api_v1.EncryptRequestOptions:
return api_v1.EncryptRequestOptions(
component_name=self.component_name,
key_name=self.key_name,
key_wrap_algorithm=self.key_wrap_algorithm,
data_encryption_cipher=self.data_encryption_cipher,
omit_decryption_key_name=self.omit_decryption_key_name,
decryption_key_name=self.decryption_key_name,
)


@dataclass
class DecryptOptions:
"""DecryptOptions contains options passed to the decrypt method.

Args:
component_name (str): The name of the component.
key_name (str, optional): The name of the key to use for the decryption operation.
"""

component_name: str
key_name: str = ''

def get_proto(self) -> api_v1.DecryptRequestOptions:
return api_v1.DecryptRequestOptions(
component_name=self.component_name,
key_name=self.key_name,
)
Loading
Loading