Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: creating media player that works without transcoding #559

Merged
merged 1 commit into from
Apr 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/aiortc/codecs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import List, Tuple

from av.frame import Frame
from av.packet import Packet

from ..jitterbuffer import JitterFrame

Expand All @@ -18,3 +19,6 @@ def encode(
self, frame: Frame, force_keyframe: bool = False
) -> Tuple[List[bytes], int]:
pass # pragma: no cover

def pack(self, packet: Packet) -> Tuple[List[bytes], int]:
return bytes(packet), int(packet.pts)
6 changes: 6 additions & 0 deletions src/aiortc/codecs/h264.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import av
from av.frame import Frame
from av.packet import Packet

from ..jitterbuffer import JitterFrame
from ..mediastreams import VIDEO_TIME_BASE, convert_timebase
Expand Down Expand Up @@ -319,6 +320,11 @@ def encode(
timestamp = convert_timebase(frame.pts, frame.time_base, VIDEO_TIME_BASE)
return self._packetize(packages), timestamp

def pack(self, packet: Packet) -> Tuple[List[bytes], int]:
assert isinstance(packet, av.Packet)
packages = self._split_bitstream(bytes(packet))
return self._packetize(packages), int(packet.pts)

@property
def target_bitrate(self) -> int:
"""
Expand Down
88 changes: 70 additions & 18 deletions src/aiortc/contrib/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
import logging
import threading
import time
from typing import Dict, Optional, Set
from typing import Dict, Optional, Set, Union

import av
from av import AudioFrame, VideoFrame
from av.frame import Frame
from av.packet import Packet
from av.video.stream import VideoStream
jlaine marked this conversation as resolved.
Show resolved Hide resolved

from ..mediastreams import AUDIO_PTIME, MediaStreamError, MediaStreamTrack

logger = logging.getLogger(__name__)


REAL_TIME_FORMATS = [
"alsa",
"android_camera",
Expand Down Expand Up @@ -81,7 +82,7 @@ async def stop(self):
self.__tracks = {}


def player_worker(
def player_worker_decode(
loop,
container,
streams,
Expand Down Expand Up @@ -109,7 +110,7 @@ def player_worker(
while not quit_event.is_set():
try:
frame = next(container.decode(*streams))
except (av.AVError, StopIteration) as exc:
except Exception as exc:
if isinstance(exc, av.FFmpegError) and exc.errno == errno.EAGAIN:
time.sleep(0.01)
continue
Expand Down Expand Up @@ -153,6 +154,47 @@ def player_worker(
asyncio.run_coroutine_threadsafe(video_track._queue.put(frame), loop)


def player_worker_demux(
loop,
container,
streams,
audio_track,
video_track,
quit_event,
throttle_playback,
loop_playback,
):
frame_time = None
start_time = time.time()

while not quit_event.is_set():
try:
packet = next(container.demux(*streams))
if not packet.size:
raise StopIteration
except Exception as exc:
if isinstance(exc, av.FFmpegError) and exc.errno == errno.EAGAIN:
time.sleep(0.01)
continue
if isinstance(exc, StopIteration) and loop_playback:
container.seek(0)
continue
if video_track:
asyncio.run_coroutine_threadsafe(video_track._queue.put(None), loop)
break

# read up to 1 second ahead
if throttle_playback:
elapsed_time = time.time() - start_time
if frame_time and frame_time > elapsed_time + 1:
time.sleep(0.1)

if isinstance(packet.stream, VideoStream) and video_track:
if packet.pts is not None and packet.time_base is not None:
frame_time = int(packet.pts * packet.time_base)
asyncio.run_coroutine_threadsafe(video_track._queue.put(packet), loop)


class PlayerStreamTrack(MediaStreamTrack):
def __init__(self, player, kind):
super().__init__()
Expand All @@ -161,30 +203,33 @@ def __init__(self, player, kind):
self._queue = asyncio.Queue()
self._start = None

async def recv(self):
async def recv(self) -> Union[Frame, Packet]:
if self.readyState != "live":
raise MediaStreamError

self._player._start(self)
frame = await self._queue.get()
if frame is None:
data = await self._queue.get()
if data is None:
self.stop()
raise MediaStreamError
frame_time = frame.time
if isinstance(data, Frame):
data_time = data.time
elif isinstance(data, Packet):
data_time = float(data.pts * data.time_base)

# control playback rate
if (
self._player is not None
and self._player._throttle_playback
and frame_time is not None
and data_time is not None
):
if self._start is None:
self._start = time.time() - frame_time
self._start = time.time() - data_time
else:
wait = self._start + frame_time - time.time()
wait = self._start + data_time - time.time()
await asyncio.sleep(wait)

return frame
return data

def stop(self):
super().stop()
Expand Down Expand Up @@ -230,23 +275,30 @@ class MediaPlayer:
:param loop: Whether to repeat playback indefinitely (requires a seekable file).
"""

def __init__(self, file, format=None, options={}, loop=False):
def __init__(self, file, format=None, options={}, loop=False, decode=True):
self.__container = av.open(file=file, format=format, mode="r", options=options)
self.__thread: Optional[threading.Thread] = None
self.__thread_quit: Optional[threading.Event] = None

# examine streams
self.__started: Set[PlayerStreamTrack] = set()
self.__streams = []
self.__decode = decode
self.__audio: Optional[PlayerStreamTrack] = None
self.__video: Optional[PlayerStreamTrack] = None
for stream in self.__container.streams:
if stream.type == "audio" and not self.__audio:
self.__audio = PlayerStreamTrack(self, kind="audio")
self.__streams.append(stream)
if self.__decode:
self.__audio = PlayerStreamTrack(self, kind="audio")
self.__streams.append(stream)
elif stream.type == "video" and not self.__video:
self.__video = PlayerStreamTrack(self, kind="video")
self.__streams.append(stream)
if not self.__decode:
if stream.codec_context.name in ["h264", "mpeg4"]:
self.__video = PlayerStreamTrack(self, kind="video")
self.__streams.append(stream)
else:
self.__video = PlayerStreamTrack(self, kind="video")
self.__streams.append(stream)

# check whether we need to throttle playback
container_format = set(self.__container.format.name.split(","))
Expand Down Expand Up @@ -279,7 +331,7 @@ def _start(self, track: PlayerStreamTrack) -> None:
self.__thread_quit = threading.Event()
self.__thread = threading.Thread(
name="media-player",
target=player_worker,
target=player_worker_decode if self.__decode else player_worker_demux,
args=(
asyncio.get_event_loop(),
self.__container,
Expand Down
8 changes: 5 additions & 3 deletions src/aiortc/mediastreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import time
import uuid
from abc import ABCMeta, abstractmethod
from typing import Tuple
from typing import Tuple, Union

from av import AudioFrame, VideoFrame
from av.frame import Frame
from av.packet import Packet
from pyee.asyncio import AsyncIOEventEmitter

AUDIO_PTIME = 0.020 # 20ms audio packetization
Expand Down Expand Up @@ -51,9 +52,10 @@ def readyState(self) -> str:
return "ended" if self.__ended else "live"

@abstractmethod
async def recv(self) -> Frame:
async def recv(self) -> Union[Frame, Packet]:
"""
Receive the next :class:`~av.audio.frame.AudioFrame` or :class:`~av.video.frame.VideoFrame`.
Receive the next :class:`~av.audio.frame.AudioFrame`, :class:`~av.video.frame.VideoFrame`
or :class:`~av.packet.Packet`
"""

def stop(self) -> None:
Expand Down
26 changes: 16 additions & 10 deletions src/aiortc/rtcrtpsender.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Callable, Dict, List, Optional, Union

from av import AudioFrame
from av.frame import Frame

from . import clock, rtp
from .codecs import get_capabilities, get_encoder, is_rtx
Expand Down Expand Up @@ -265,20 +266,25 @@ async def _handle_rtcp_packet(self, packet):
pass

async def _next_encoded_frame(self, codec: RTCRtpCodecParameters):
# get frame
frame = await self.__track.recv()
# get [Frame|Packet]
data = await self.__track.recv()
audio_level = None
if isinstance(frame, AudioFrame):
audio_level = rtp.compute_audio_level_dbov(frame)

# encode frame
if self.__encoder is None:
self.__encoder = get_encoder(codec)
force_keyframe = self.__force_keyframe
self.__force_keyframe = False
payloads, timestamp = await self.__loop.run_in_executor(
None, self.__encoder.encode, frame, force_keyframe
)

if isinstance(data, Frame):
# encode frame
if isinstance(data, AudioFrame):
audio_level = rtp.compute_audio_level_dbov(data)

force_keyframe = self.__force_keyframe
self.__force_keyframe = False
payloads, timestamp = await self.__loop.run_in_executor(
None, self.__encoder.encode, data, force_keyframe
rprata marked this conversation as resolved.
Show resolved Hide resolved
)
else:
payloads, timestamp = self.__encoder.pack(data)

return RTCEncodedFrame(payloads, timestamp, audio_level)

Expand Down
11 changes: 11 additions & 0 deletions tests/codecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from unittest import TestCase

from av import AudioFrame, VideoFrame
from av.packet import Packet

from aiortc.codecs import depayload, get_decoder, get_encoder
from aiortc.jitterbuffer import JitterFrame
Expand Down Expand Up @@ -47,6 +48,16 @@ def create_video_frame(
frame.time_base = time_base
return frame

def create_video_packet(self, header, pts):
"""
Create a single blank video packet.
"""
buffer = header + [0]
packet = Packet(len(buffer))
packet.update(bytes(buffer))
packet.pts = pts
return packet

def create_video_frames(self, width, height, count, time_base=VIDEO_TIME_BASE):
"""
Create consecutive blank video frames.
Expand Down
Loading