Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for cryptography api #719

Merged
merged 10 commits into from
Jul 2, 2024
116 changes: 116 additions & 0 deletions dapr/aio/clients/grpc/_request.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# -*- coding: utf-8 -*-

"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

import io
from typing import Union

from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
from dapr.clients.grpc._helpers import to_bytes
from dapr.clients.grpc._request import DaprRequest
from dapr.proto import api_v1, common_v1


class EncryptRequestIterator(DaprRequest):
"""An asynchronous iterator for cryptography encrypt API requests.

This reads data from a given stream by chunks and converts it to an asynchronous iterator
of cryptography encrypt API requests.
This iterator will be used for encrypt gRPC bidirectional streaming requests.
"""

def __init__(
self,
data: Union[str, bytes],
options: EncryptOptions,
):
"""Initialize EncryptRequestIterator with data and encryption options.

Args:
data (Union[str, bytes]): data to be encrypted
options (EncryptOptions): encryption options
"""
self.data = io.BytesIO(to_bytes(data))
self.options = options.get_proto()
self.buffer_size = 2 << 10 # 2KiB
self.seq = 0

def __aiter__(self):
"""Returns the iterator object itself."""
return self

async def __anext__(self):
"""Read the next chunk of data from the input stream and create a gRPC stream request."""
# Read data from the input stream, in chunks of up to 2KiB
# Send the data until we reach the end of the input stream
chunk = self.data.read(self.buffer_size)
if not chunk:
raise StopAsyncIteration

payload = common_v1.StreamPayload(data=chunk, seq=self.seq)
if self.seq == 0:
# If this is the first chunk, add the options
request_proto = api_v1.EncryptRequest(payload=payload, options=self.options)
else:
request_proto = api_v1.EncryptRequest(payload=payload)

Check warning on line 66 in dapr/aio/clients/grpc/_request.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/_request.py#L66

Added line #L66 was not covered by tests

self.seq += 1
return request_proto


class DecryptRequestIterator(DaprRequest):
"""An asynchronous iterator for cryptography decrypt API requests.

This reads data from a given stream by chunks and converts it to an asynchronous iterator
of cryptography decrypt API requests.
This iterator will be used for encrypt gRPC bidirectional streaming requests.
"""

def __init__(
self,
data: Union[str, bytes],
options: DecryptOptions,
):
"""Initialize DecryptRequestIterator with data and decryption options.

Args:
data (Union[str, bytes]): data to be decrypted
options (DecryptOptions): decryption options
"""
self.data = io.BytesIO(to_bytes(data))
self.options = options.get_proto()
self.buffer_size = 2 << 10 # 2KiB
self.seq = 0

def __aiter__(self):
"""Returns the iterator object itself."""
return self

async def __anext__(self):
"""Read the next chunk of data from the input stream and create a gRPC stream request."""
# Read data from the input stream, in chunks of up to 2KiB
# Send the data until we reach the end of the input stream
chunk = self.data.read(self.buffer_size)
if not chunk:
raise StopAsyncIteration

payload = common_v1.StreamPayload(data=chunk, seq=self.seq)
if self.seq == 0:
# If this is the first chunk, add the options
request_proto = api_v1.DecryptRequest(payload=payload, options=self.options)
else:
request_proto = api_v1.DecryptRequest(payload=payload)

Check warning on line 113 in dapr/aio/clients/grpc/_request.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/_request.py#L113

Added line #L113 was not covered by tests

self.seq += 1
return request_proto
91 changes: 91 additions & 0 deletions dapr/aio/clients/grpc/_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-

"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from typing import AsyncGenerator, Generic

from dapr.proto import api_v1
from dapr.clients.grpc._response import DaprResponse, TCryptoResponse


class CryptoResponse(DaprResponse, Generic[TCryptoResponse]):
"""An asynchronous iterable of cryptography API responses."""

def __init__(self, stream: AsyncGenerator[TCryptoResponse, None]):
"""Initialize a CryptoResponse.

Args:
stream (AsyncGenerator[TCryptoResponse, None, None]): A stream of cryptography API responses.
"""
self._stream = stream
self._buffer = bytearray()
self._expected_seq = 0

async def __aiter__(self) -> AsyncGenerator[bytes, None]:
"""Read the next chunk of data from the stream.

Yields:
bytes: The payload data of the next chunk from the stream.

Raises:
ValueError: If the sequence number of the next chunk is incorrect.
"""
async for chunk in self._stream:
if chunk.payload.seq != self._expected_seq:
raise ValueError('invalid sequence number in chunk')

Check warning on line 46 in dapr/aio/clients/grpc/_response.py

View check run for this annotation

Codecov / codecov/patch

dapr/aio/clients/grpc/_response.py#L46

Added line #L46 was not covered by tests
self._expected_seq += 1
yield chunk.payload.data

async def read(self, size: int = -1) -> bytes:
"""Read bytes from the stream.

If size is -1, the entire stream is read and returned as bytes.
Otherwise, up to `size` bytes are read from the stream and returned.
If the stream ends before `size` bytes are available, the remaining
bytes are returned.

Args:
size (int): The maximum number of bytes to read. If -1 (the default),
read until the end of the stream.

Returns:
bytes: The bytes read from the stream.
"""
if size == -1:
# Read the entire stream
return b''.join([chunk async for chunk in self])

# Read the requested number of bytes
data = bytes(self._buffer)
self._buffer.clear()

async for chunk in self:
data += chunk
if len(data) >= size:
break

# Update the buffer
remaining = data[size:]
self._buffer.extend(remaining)

# Return the requested number of bytes
return data[:size]


class EncryptResponse(CryptoResponse[api_v1.EncryptResponse]):
...


class DecryptResponse(CryptoResponse[api_v1.DecryptResponse]):
...
100 changes: 100 additions & 0 deletions dapr/aio/clients/grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
)

from dapr.clients.exceptions import DaprInternalError, DaprGrpcError
from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
from dapr.clients.grpc._state import StateOptions, StateItem
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
from dapr.clients.health import DaprHealth
Expand All @@ -60,6 +61,14 @@
validateNotNone,
validateNotBlankString,
)
from dapr.aio.clients.grpc._request import (
EncryptRequestIterator,
DecryptRequestIterator,
)
from dapr.aio.clients.grpc._response import (
EncryptResponse,
DecryptResponse,
)
from dapr.clients.grpc._request import (
InvokeMethodRequest,
BindingRequest,
Expand Down Expand Up @@ -1227,6 +1236,97 @@ async def unlock(self, store_name: str, resource_id: str, lock_owner: str) -> Un
status=UnlockResponseStatus(response.status), headers=await call.initial_metadata()
)

async def encrypt(self, data: Union[str, bytes], options: EncryptOptions):
"""Encrypt a stream data with given options.

The encrypt API encrypts a stream data with the given options.

Example:
from dapr.aio.clients import DaprClient
from dapr.clients.grpc._crypto import EncryptOptions

async with DaprClient() as d:
options = EncryptOptions(
component_name='crypto_component',
key_name='crypto_key',
key_wrap_algorithm='RSA',
)
resp = await d.encrypt(
data='hello dapr',
options=options,
)
encrypted_data = await resp.read()

Args:
data (Union[str, bytes]): Data to be encrypted.
options (EncryptOptions): Encryption options.

Returns:
Readable stream of `api_v1.EncryptResponse`.

Raises:
ValueError: If component_name, key_name, or key_wrap_algorithm is empty.
"""
# Warnings and input validation
warn(
'The Encrypt API is an Alpha version and is subject to change.',
UserWarning,
stacklevel=2,
)
validateNotBlankString(
component_name=options.component_name,
key_name=options.key_name,
key_wrap_algorithm=options.key_wrap_algorithm,
)

req_iterator = EncryptRequestIterator(data, options)
resp_stream = self._stub.EncryptAlpha1(req_iterator)
return EncryptResponse(resp_stream)

async def decrypt(self, data: Union[str, bytes], options: DecryptOptions):
"""Decrypt a stream data with given options.

The decrypt API decrypts a stream data with the given options.

Example:
from dapr.aio.clients import DaprClient
from dapr.clients.grpc._crypto import DecryptOptions

async with DaprClient() as d:
options = DecryptOptions(
component_name='crypto_component',
key_name='crypto_key',
)
resp = await d.decrypt(
data='hello dapr',
options=options,
)
decrypted_data = await resp.read()

Args:
data (Union[str, bytes]): Data to be decrypted.
options (DecryptOptions): Decryption options.

Returns:
Readable stream of `api_v1.DecryptResponse`.

Raises:
ValueError: If component_name is empty.
"""
# Warnings and input validation
warn(
'The Decrypt API is an Alpha version and is subject to change.',
UserWarning,
stacklevel=2,
)
validateNotBlankString(
component_name=options.component_name,
)

req_iterator = DecryptRequestIterator(data, options)
resp_stream = self._stub.DecryptAlpha1(req_iterator)
return DecryptResponse(resp_stream)

async def start_workflow(
self,
workflow_component: str,
Expand Down
Loading