Skip to content

Commit

Permalink
Cleanup (#11)
Browse files Browse the repository at this point in the history
* Remove unused imports

* Run pyupgrade for 3.6

* Run pyupgrade for 3.9

* Run pyupgrade for 3.10

* Use ruff to sort imports

* Apply ruff fixes

* Format with ruff

* Relax versions to match existing uses

* Add ruff to local workflow

* Run tests with all supported versions of Python

* Explicitly close writer so waiting will work properly (fixes #12)
  • Loading branch information
daveisfera committed Jan 18, 2024
1 parent 4ce5cc2 commit 3faab2c
Show file tree
Hide file tree
Showing 32 changed files with 353 additions and 318 deletions.
9 changes: 7 additions & 2 deletions .github/workflows/pull-requests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@ concurrency:
jobs:
testing:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12"]

steps:
- uses: actions/checkout@v3
- uses: FedericoCarboni/setup-ffmpeg@v2
id: setup-ffmpeg
- uses: actions/setup-python@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: '3.11'
python-version: ${{ matrix.python-version }}
- run: make install
- run: make unittest
17 changes: 17 additions & 0 deletions .github/workflows/ruff.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: ruff

run-name: Ruff

on:
push:
branches:
- "**"

jobs:
ruff:
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@v4

- run: ./ruff.sh check .
17 changes: 17 additions & 0 deletions .github/workflows/ruff_format.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: ruff

run-name: Ruff

on:
push:
branches:
- "**"

jobs:
ruff:
runs-on: ubuntu-latest
steps:
- name: Checkout Code
uses: actions/checkout@v4

- run: ./ruff.sh format --check .
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ install:
@pip install -r requirements.txt
@pip install -r requirements-dev.txt

ruff:
@ruff --fix .
@ruff format .

unittest:
@cd tests && pytest ./ --no-header

Expand Down
22 changes: 10 additions & 12 deletions example/demo_ffmpeg.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
from __future__ import annotations

import asyncio
import os
import logging
from asyncio import StreamReader, Server
import os
from asyncio import StreamReader

from pyrtmp import StreamClosedException
from pyrtmp.flv import FLVMediaType, FLVWriter
from pyrtmp.rtmp import RTMPProtocol, SimpleRTMPController, SimpleRTMPServer
from pyrtmp.session_manager import SessionManager
from pyrtmp.rtmp import SimpleRTMPController, RTMPProtocol, SimpleRTMPServer

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


class RTMP2SocketController(SimpleRTMPController):

def __init__(self, output_directory: str):
self.output_directory = output_directory
super().__init__()

async def on_ns_publish(self, session, message) -> None:
publishing_name = message.publishing_name
prefix = os.path.join(self.output_directory, f'{publishing_name}')
prefix = os.path.join(self.output_directory, f"{publishing_name}")
session.state = RemoteProcessFLVWriter()
logger.debug(f'output to {prefix}.flv')
logger.debug(f"output to {prefix}.flv")
await session.state.initialize(
command=f"ffmpeg -y -i pipe:0 -c:v copy -c:a copy -f flv {prefix}.flv",
stdout_log=f'{prefix}.stdout.log',
stderr_log=f'{prefix}.stderr.log',
stdout_log=f"{prefix}.stdout.log",
stderr_log=f"{prefix}.stderr.log",
)
session.state.write_header()
await super().on_ns_publish(session, message)
Expand All @@ -51,7 +51,6 @@ async def on_stream_closed(self, session: SessionManager, exception: StreamClose


class RemoteProcessFLVWriter:

def __init__(self):
self.proc = None
self.stdout = None
Expand All @@ -69,7 +68,7 @@ async def initialize(self, command: str, stdout_log: str, stderr_log: str):
self.stderr = asyncio.create_task(self._read_to_file(stderr_log, self.proc.stderr))

async def _read_to_file(self, filename: str, stream: StreamReader):
fp = open(filename, 'wt')
fp = open(filename, "w")
while not stream.at_eof():
data = await stream.readline()
fp.write(data.decode())
Expand All @@ -91,7 +90,6 @@ async def close(self):


class SimpleServer(SimpleRTMPServer):

def __init__(self, output_directory: str):
self.output_directory = output_directory
super().__init__()
Expand All @@ -108,7 +106,7 @@ async def create(self, host: str, port: int):
async def main():
current_dir = os.path.dirname(os.path.abspath(__file__))
server = SimpleServer(output_directory=current_dir)
await server.create(host='0.0.0.0', port=1935)
await server.create(host="0.0.0.0", port=1935)
await server.start()
await server.wait_closed()

Expand Down
8 changes: 3 additions & 5 deletions example/demo_flvdump.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
import asyncio
import os
import logging
import os

from pyrtmp import StreamClosedException
from pyrtmp.flv import FLVFileWriter, FLVMediaType
from pyrtmp.rtmp import RTMPProtocol, SimpleRTMPController, SimpleRTMPServer
from pyrtmp.session_manager import SessionManager
from pyrtmp.rtmp import SimpleRTMPController, RTMPProtocol, SimpleRTMPServer

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


class RTMP2FLVController(SimpleRTMPController):

def __init__(self, output_directory: str):
self.output_directory = output_directory
super().__init__()
Expand Down Expand Up @@ -42,7 +41,6 @@ async def on_stream_closed(self, session: SessionManager, exception: StreamClose


class SimpleServer(SimpleRTMPServer):

def __init__(self, output_directory: str):
self.output_directory = output_directory
super().__init__()
Expand All @@ -59,7 +57,7 @@ async def create(self, host: str, port: int):
async def main():
current_dir = os.path.dirname(os.path.abspath(__file__))
server = SimpleServer(output_directory=current_dir)
await server.create(host='0.0.0.0', port=1935)
await server.create(host="0.0.0.0", port=1935)
await server.start()
await server.wait_closed()

Expand Down
48 changes: 24 additions & 24 deletions example/demo_rtmpt.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import os
import uuid
from asyncio import StreamReader, StreamWriter, events, BaseProtocol
from asyncio import BaseProtocol, StreamReader, StreamWriter, events
from io import BytesIO

import quart
Expand All @@ -23,7 +23,6 @@


class DummyProtocol(BaseProtocol):

async def _drain_helper(self):
pass

Expand All @@ -32,12 +31,9 @@ async def _get_close_waiter(self, stream: StreamWriter):


class RTMPTWrapper:

def __init__(self,
session_id: str,
peer: tuple,
controller: BaseRTMPController,
loop: events.AbstractEventLoop) -> None:
def __init__(
self, session_id: str, peer: tuple, controller: BaseRTMPController, loop: events.AbstractEventLoop
) -> None:
self.delay = 0
self.loop = loop
self.session_id = session_id
Expand All @@ -46,12 +42,16 @@ def __init__(self,
self.stream = BytesIO()
self.reader = StreamReader(loop=self.loop)
self.writer = StreamWriter(
BufferedWriteTransport(self.stream, extra={
"peername": peer,
}),
BufferedWriteTransport(
self.stream,
extra={
"peername": peer,
},
),
DummyProtocol(),
self.reader,
self.loop)
self.loop,
)
self.task = self.loop.create_task(self._dispatcher())

async def _dispatcher(self):
Expand Down Expand Up @@ -91,7 +91,7 @@ async def get_buffered_data(self):
return buffer


@app.route('/open/<int:segment>', methods=['POST'])
@app.route("/open/<int:segment>", methods=["POST"])
async def open(segment: int):
# body = await request.body
# assert body == b'\x00'
Expand All @@ -103,42 +103,42 @@ async def open(segment: int):
loop=asyncio.get_running_loop(),
)
resp = quart.Response(sid)
resp.headers['Content-Type'] = 'application/x-fcs'
resp.headers['Cache-Control'] = 'no-cache'
resp.headers["Content-Type"] = "application/x-fcs"
resp.headers["Cache-Control"] = "no-cache"
return resp


@app.route('/send/<string:sid>/<int:segment>', methods=['POST'])
@app.route("/send/<string:sid>/<int:segment>", methods=["POST"])
async def send(sid: str, segment: int):
body = await request.body
session[sid].reader.feed_data(body)
data = await session[sid].read_from_buffer()
resp = quart.Response(data)
resp.headers['Content-Type'] = 'application/x-fcs'
resp.headers['Cache-Control'] = 'no-cache'
resp.headers["Content-Type"] = "application/x-fcs"
resp.headers["Cache-Control"] = "no-cache"
return resp


@app.route('/idle/<string:sid>/<int:segment>', methods=['POST'])
@app.route("/idle/<string:sid>/<int:segment>", methods=["POST"])
async def idle(sid: str, segment: int):
# body = await request.body
# assert body == b'\x00'
data = await session[sid].read_from_buffer()
resp = quart.Response(data)
resp.headers['Content-Type'] = 'application/x-fcs'
resp.headers['Cache-Control'] = 'no-cache'
resp.headers["Content-Type"] = "application/x-fcs"
resp.headers["Cache-Control"] = "no-cache"
return resp


@app.route('/close/<string:sid>/<int:segment>', methods=['POST'])
@app.route("/close/<string:sid>/<int:segment>", methods=["POST"])
async def close(sid: str, segment: int):
# body = await request.body
# assert body == b'\x00'
data = await session[sid].read_from_buffer()
session[sid].close()
resp = quart.Response(data)
resp.headers['Content-Type'] = 'application/x-fcs'
resp.headers['Cache-Control'] = 'no-cache'
resp.headers["Content-Type"] = "application/x-fcs"
resp.headers["Cache-Control"] = "no-cache"
return resp


Expand Down
12 changes: 6 additions & 6 deletions pyrtmp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import os
from asyncio import StreamReader, WriteTransport
from collections.abc import Mapping
from io import BytesIO
from typing import Any, List, Optional, Mapping
from typing import Any

from bitstring import BitStream
from bitstring.utils import tokenparser

Expand All @@ -15,7 +17,6 @@ class StreamClosedException(Exception):


class BitStreamReader:

def __init__(self, reader: StreamReader) -> None:
self.reader = reader
self.buffer = BitStream()
Expand Down Expand Up @@ -49,14 +50,13 @@ def _append(self, data: bytes) -> None:


class BufferedWriteTransport(WriteTransport):

def __init__(self, buffer: BytesIO, extra: Optional[Mapping[Any, Any]] = ...) -> None:
def __init__(self, buffer: BytesIO, extra: Mapping[Any, Any] | None = ...) -> None:
self._buffer = buffer
self._closing = False
self._closed = False
super().__init__(extra)

def set_write_buffer_limits(self, high: Optional[int] = ..., low: Optional[int] = ...) -> None:
def set_write_buffer_limits(self, high: int | None = ..., low: int | None = ...) -> None:
raise NotImplementedError

def get_write_buffer_size(self) -> int:
Expand All @@ -65,7 +65,7 @@ def get_write_buffer_size(self) -> int:
def write(self, data: Any) -> None:
self._buffer.write(data)

def writelines(self, list_of_data: List[Any]) -> None:
def writelines(self, list_of_data: list[Any]) -> None:
raise NotImplementedError

def write_eof(self) -> None:
Expand Down
Loading

0 comments on commit 3faab2c

Please sign in to comment.