Skip to content

Commit

Permalink
Merge pull request #64 from bbc/dev
Browse files Browse the repository at this point in the history
Merge dev into master for v2.8.0
  • Loading branch information
jamesba committed Nov 27, 2019
2 parents 8c675fb + 59d6d2b commit 0b74eea
Show file tree
Hide file tree
Showing 14 changed files with 94 additions and 109 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Mediagrains Library Changelog

## 2.8.0
- Switched to using asynctest for testing asynchronous code
- Made asynchronous IO wrappers that wrap synchronous IO do so using an executor thread pool
- Added support for automatically wrapping synchronous files in asynchronous wrappers to use them in gsf encoder

## 2.7.2
- Bugfix: Restore behaviour whereby grains can be added to a segment object during an active progressive encode

Expand Down
13 changes: 8 additions & 5 deletions mediagrains/grain.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@
Sized,
Iterator,
Iterable,
Awaitable)
Awaitable,
Generator)
from typing_extensions import Literal
from .typing import (
RationalTypes,
Expand Down Expand Up @@ -271,10 +272,12 @@ def __bytes__(self) -> Optional[bytes]:
def has_data(self) -> bool:
return self._data is not None

async def __await__(self) -> Optional[GrainDataType]:
if self._data is None and self._data_fetcher_coroutine is not None:
self._data = await self._data_fetcher_coroutine
return self._data
def __await__(self) -> Generator[Any, None, Optional[GrainDataType]]:
async def __inner():
if self._data is None and self._data_fetcher_coroutine is not None:
self._data = await self._data_fetcher_coroutine
return self._data
return __inner().__await__()

async def __aenter__(self):
await self
Expand Down
6 changes: 4 additions & 2 deletions mediagrains/gsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1415,8 +1415,10 @@ def __exit__(self, *args, **kwargs):

async def __aenter__(self):
if not isinstance(self.file, AsyncBinaryIO) and not isinstance(self.file, OpenAsyncBinaryIO):
raise ValueError("To use in asynchronous mode the file must be an asynchronously writeable file-like object")
self._open_async_encoder = OpenAsyncGSFEncoder(self.file,
f = AsyncFileWrapper(self.file)
else:
f = self.file
self._open_async_encoder = OpenAsyncGSFEncoder(f,
self.major,
self.minor,
self.id,
Expand Down
12 changes: 7 additions & 5 deletions mediagrains/numpy/videograin.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import numpy as np
from numpy.lib.stride_tricks import as_strided

from typing import Callable, Dict, Tuple, Optional, Awaitable, cast, overload
from typing import Callable, Dict, Tuple, Optional, Awaitable, cast, overload, Generator, Any
from ..typing import VideoGrainMetadataDict, GrainDataType, GrainDataParameterType

from inspect import isawaitable
Expand Down Expand Up @@ -280,10 +280,12 @@ def __repr__(self) -> str:
else:
return "{}({!r},< numpy data of length {} >)".format(self._factory, self.meta, len(self.data))

async def __await__(self):
if self._data is None and self._data_fetcher_coroutine is not None:
self.data = await self._data_fetcher_coroutine
return self.data
def __await__(self) -> Generator[Any, None, np.ndarray]:
async def __inner():
if self._data is None and self._data_fetcher_coroutine is not None:
self.data = await self._data_fetcher_coroutine
return self.data
return __inner().__await__()

async def __aenter__(self) -> "VIDEOGRAIN":
await self
Expand Down
47 changes: 35 additions & 12 deletions mediagrains/utils/asyncbinaryio.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@
from abc import ABCMeta, abstractmethod
from io import SEEK_SET, SEEK_CUR

from typing import Type, Union, Optional, IO, cast
from typing import Type, Union, Optional, IO, cast, TypeVar, Callable, Coroutine
from io import RawIOBase, UnsupportedOperation

from asyncio import StreamReader, StreamWriter
import asyncio
from functools import wraps

from deprecated import deprecated


class OpenAsyncBinaryIO(metaclass=ABCMeta):
Expand Down Expand Up @@ -97,6 +101,16 @@ async def __aexit__(self, *args, **kwargs) -> None:
await self._inst.__close__()


T = TypeVar("T")


def wrap_in_executor(f: Callable[..., T]) -> Callable[..., Coroutine[None, None, T]]:
@wraps(f)
async def __inner(*args, **kwargs):
return await asyncio.get_event_loop().run_in_executor(None, lambda: f(*args, **kwargs))
return __inner


class OpenAsyncBytesIO(OpenAsyncBinaryIO):
def __init__(self, b: bytes):
self._buffer = bytearray(b)
Expand All @@ -107,7 +121,8 @@ async def __open__(self) -> None:
"This coroutine should include any code that is to be run when the io stream is opened"
self._pos = 0

async def readinto(self, b: bytearray) -> int:
@wrap_in_executor
def readinto(self, b: bytearray) -> int:
length = min(self._len - self._pos, len(b))
if length > 0:
b[:length] = self._buffer[self._pos:self._pos + length]
Expand All @@ -116,13 +131,15 @@ async def readinto(self, b: bytearray) -> int:
else:
return 0

async def readall(self) -> bytes:
@wrap_in_executor
def readall(self) -> bytes:
if self._pos >= 0 and self._pos < self._len:
return bytes(self._buffer[self._pos:self._len])
else:
return bytes()

async def write(self, b: bytes) -> int:
@wrap_in_executor
def write(self, b: bytes) -> int:
if self._pos < 0:
return 0

Expand Down Expand Up @@ -168,7 +185,11 @@ def writable(self) -> bool:
def getbuffer(self) -> bytearray:
return self._buffer[:self._len]

@deprecated(version="2.8.0", reason="Use getvalue instead to be more consistent with the standard library")
def value(self) -> bytes:
return self.getvalue()

def getvalue(self) -> bytes:
return bytes(self._buffer[:self._len])


Expand All @@ -181,38 +202,40 @@ def getbuffer(self) -> bytearray:
return self._inst.getbuffer()

def value(self) -> bytes:
return self._inst.value()
return self.getvalue()

def getvalue(self) -> bytes:
return self._inst.getvalue()


class OpenAsyncFileWrapper(OpenAsyncBinaryIO):
def __init__(self, fp: IO[bytes]):
self.fp = cast(RawIOBase, fp)

async def __open__(self) -> None:
# self.fp.__enter__()
pass

async def __close__(self) -> None:
# self.fp.__exit__(None, None, None)
pass

async def read(self, s: int = -1) -> bytes:
@wrap_in_executor
def read(self, s: int = -1) -> bytes:
while True:
r = self.fp.read(s)
if r is not None:
return r

async def readinto(self, b: bytearray) -> Optional[int]:
return self.fp.readinto(b)
return await wrap_in_executor(self.fp.readinto)(b)

async def readall(self) -> bytes:
return self.fp.readall()
return await wrap_in_executor(self.fp.readall)()

async def write(self, b: bytes) -> Optional[int]:
return self.fp.write(b)
return await wrap_in_executor(self.fp.write)(b)

async def truncate(self, size: Optional[int] = None) -> int:
return self.fp.truncate(size)
return await wrap_in_executor(self.fp.truncate)(size)

def tell(self) -> int:
return self.fp.tell()
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
]

setup(name="mediagrains",
version="2.7.2",
version="2.8.0",
python_requires='>=3.6.0',
description="Simple utility for grain-based media",
url='https://github.com/bbc/rd-apmm-python-lib-mediagrains',
Expand Down
4 changes: 2 additions & 2 deletions tests/test_comparison.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
# limitations under the License.
#

import unittest
from unittest import TestCase
import asynctest as unittest
from asynctest import TestCase

from hypothesis import given, assume, settings, HealthCheck
from hypothesis.strategies import sampled_from, just, tuples
Expand Down
8 changes: 2 additions & 6 deletions tests/test_grain.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,16 @@
# limitations under the License.
#

from unittest import TestCase
from asynctest import TestCase
import uuid
from mediagrains import Grain, VideoGrain, AudioGrain, CodedVideoGrain, CodedAudioGrain, EventGrain
from mediagrains.cogenums import CogFrameFormat, CogFrameLayout, CogAudioFormat
from mediatimestamp.immutable import Timestamp, TimeOffset, TimeRange
import mock
from asynctest import mock
from fractions import Fraction
import json
from copy import copy, deepcopy

from fixtures import async_test


src_id = uuid.UUID("f18ee944-0841-11e8-b0b0-17cef04bd429")
flow_id = uuid.UUID("f79ce4da-0841-11e8-9a5b-dfedb11bafeb")
Expand Down Expand Up @@ -671,7 +669,6 @@ def test_grain_makes_videograin_without_data(self):
self.assertEqual(grain.length, 0)
self.assertEqual(grain.expected_length, 8192*1080)

@async_test
async def test_videograin_with_async_data(self):
meta = VIDEOGRAIN_TEST_METADATA

Expand All @@ -695,7 +692,6 @@ async def _get_data():
self.assertEqual((await grain)[:16], expected_data[:16])
self.assertEqual(grain.data[:16], expected_data[:16])

@async_test
async def test_videograin_with_async_data_as_acm(self):
meta = VIDEOGRAIN_TEST_METADATA

Expand Down
Loading

0 comments on commit 0b74eea

Please sign in to comment.