import numpy as np
import pyaudio
from damp11113 import getDBFS, RTSubtract, RTAdd
from pyogg import OpusBufferedEncoder
from pyogg import OpusDecoder
from pyogg import opus
# Initialize PyAudio
p = pyaudio.PyAudio()
# Parameters
bar = "|"
clip = "!"
bg = " "
maxrange = 10
gain = 0.01
sample_rate = 48000
# Create an Opus encoder
opus_encoder = OpusBufferedEncoder()
opus_encoder.set_application("audio")
opus_encoder.set_sampling_frequency(48000)
opus_encoder.set_channels(2)
opus_encoder.set_bitrates(19000)
#opus_encoder.set_bandwidth("narrowband")
opus_encoder.set_compresion_complex(0)
opus_encoder.set_bitrate_mode("CBR")
opus_encoder.set_frame_size(60)
#opus_encoder.set_packets_loss(100)
# Setup decoding
# ==============
# Create an Opus decoder
opus_decoder = OpusDecoder()
opus_decoder.set_channels(2)
opus_decoder.set_sampling_frequency(sample_rate)
streaminput = p.open(format=pyaudio.paInt16, channels=2, rate=sample_rate, input=True)
streamoutput = p.open(format=pyaudio.paInt16, channels=2, rate=sample_rate, output=True)
frame = 0
try:
while True:
try:
pcm = np.frombuffer(streaminput.read(1024, exception_on_overflow=False), dtype=np.int16)
if len(pcm) == 0:
# If PCM is empty, break the loop
break
encoded_packets = opus_encoder.buffered_encode(memoryview(bytearray(pcm)))
for encoded_packet, _, _ in encoded_packets:
#print(encoded_packet, len(encoded_packet))
decoded_pcm = opus_decoder.decode(encoded_packet)
# Check if the decoded PCM is empty or not
if len(decoded_pcm) > 0:
pcm_to_write = np.frombuffer(decoded_pcm, dtype=np.int16)
L = pcm_to_write[0::2]
R = pcm_to_write[1::2]
LPR = RTAdd(L, R)
LSR = RTSubtract(L, R)
IL = np.average(np.abs(L)) * 50
IR = np.average(np.abs(R)) * 50
ILB = bar * int((50 * IL / 2 ** 16) * gain)
IRB = bar * int((50 * IR / 2 ** 16) * gain)
ILB2 = ILB[:maxrange] + clip if len(ILB) > maxrange else ILB
IRB2 = IRB[:maxrange] + clip if len(IRB) > maxrange else IRB
DBFSLSR = getDBFS(LSR, 32767)
DBFSLPR = getDBFS(LPR, 32767)
if DBFSLSR > -50 and DBFSLPR > -50:
print(f"\r ((( ST ))) | [L {ILB2.ljust(maxrange + 1, bg)} | R {IRB2.ljust(maxrange + 1, bg)}]",
flush=True, end="")
elif DBFSLPR > -70:
print(f"\r MONO | [L {ILB2.ljust(maxrange + 1, bg)} | R {IRB2.ljust(maxrange + 1, bg)}]",
flush=True, end="")
else:
print(f"\r NOIN | [L {ILB2.ljust(maxrange + 1, bg)} | R {IRB2.ljust(maxrange + 1, bg)}]",
flush=True, end="")
streamoutput.write(pcm_to_write.tobytes())
else:
print("Decoded PCM is empty")
if frame == 100:
opus_encoder.set_bitrate_mode("VBR")
#print("CVBR mode")
#opus_encoder.set_packets_loss(0)
opus_encoder.set_compresion_complex(10)
#print(frame)
frame += 1
except Exception as e:
print(e)
raise
except KeyboardInterrupt:
print("Interrupted by user")
finally:
# Clean up PyAudio streams and terminate PyAudio
streaminput.stop_stream()
streaminput.close()
streamoutput.stop_stream()
streamoutput.close()
p.terminate()
To install
pip install git+https://github.com/damp11113/PyOgg.git
PyOgg provides Python bindings for Xiph.org’s Opus, Vorbis and FLAC audio file formats as well as their Ogg container format.
PyOgg:
-
Reads and streams Opus, Vorbis, and FLAC audio formats in their standard file format (that is, from within Ogg containers).
-
Writes Opus files (that it, Opus-formatted packets into Ogg containers)
-
Reads and writes Opus-formatted packets (transported, for example, via UDP)
Further, should you wish to have still lower-level access, PyOgg provides ctypes interfaces that give direct access to the C functions and datatypes found in the libraries.
Under Windows, PyOgg comes bundled with the required dynamic libraries (DLLs) in the Windows Wheel distributions.
Under macOS, the required libraries can be easily installed using Homebrew.
PyOgg is not capable of playing audio, however, you can use Python audio libraries such as simpleaudio, sounddevice, or PyOpenAL to play audio. PyOpenAL even offers 3D playback.
For more detail, including installation instructions, please see the documentation at Read the Docs.