forked from GlasgowEmbedded/glasgow
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
37cd27d
commit f4cc32f
Showing
2 changed files
with
185 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
import logging | ||
import argparse | ||
from nmigen import * | ||
|
||
from ....gateware.pads import * | ||
from ... import * | ||
|
||
|
||
""" | ||
Sample data on rising edge of clock | ||
Left channel when WS is low | ||
Right channel when WS is high | ||
""" | ||
|
||
|
||
class AudioI2SCaptureSubtarget(Elaboratable): | ||
def __init__(self, pads, in_fifo, clk_edge="r", mode="i2s", frame_size=16): | ||
self.pads = pads | ||
self.in_fifo = in_fifo | ||
self.clk_edge = clk_edge | ||
self.mode = mode | ||
self.frame_size = frame_size | ||
|
||
def elaborate(self, platform): | ||
m = Module() | ||
|
||
# allow clock edge detection... sampled on sys_clock | ||
# clk_edge == 0x1 / 0b01 <-- rising edge | ||
# clk_edge == 0x2 / 0b10 <-- falling edge | ||
clk_edge = Signal(2) | ||
clk = Signal() | ||
m.d.sync += clk_edge.eq(Cat(self.pads.clk_t.i, clk_edge[0])) | ||
if self.clk_edge in ("r", "rising"): | ||
m.d.comb += clk.eq(clk_edge == 0x1) | ||
elif self.clk_edge in ("f", "falling"): | ||
m.d.comb += clk.eq(clk_edge == 0x2) | ||
else: | ||
assert False | ||
|
||
# allow frame edge detection... sampled on I2S rising clock | ||
# frame_edge == 0x1 / 0b001 <-- rising edge, 0-bit shift (i.e: PCM) | ||
# frame_edge == 0x6 / 0b110 <-- falling edge, 0-bit shift (i.e: PCM) | ||
# frame_edge == 0x3 / 0b011 <-- rising edge, 1-bit shift (i.e: I2S) | ||
# frame_edge == 0x4 / 0b100 <-- falling edge, 1-bit shift (i.e: I2S) | ||
frame_edge = Signal(3) | ||
word_start = Signal() | ||
with m.If(clk): | ||
m.d.sync += frame_edge.eq(Cat(self.pads.fs_t.i, frame_edge[:-1])) | ||
if self.mode == "i2s": | ||
m.d.comb += word_start.eq((frame_edge == 0x4) | (frame_edge == 0x3)) | ||
elif self.mode == "pcm": | ||
m.d.comb += word_start.eq((frame_edge == 0x6) | (frame_edge == 0x1)) | ||
else: | ||
assert False | ||
|
||
data_reg = Signal(8) | ||
data_chk = Signal(8) | ||
data_bit_num = Signal(range(32)) | ||
|
||
# per sample, the following will be placed in the FIFO: | ||
# HEADER 0xC0 | FS 0b1100000? to maintain sync and channel identity | ||
# PCM DATA 0x?? raw sample data from the wire | ||
# : repeated as necessary | ||
# CHECKSUM 0x?? XOR of each 8-bit data chunk | ||
|
||
with m.FSM(): | ||
with m.State("IDLE"): | ||
m.d.sync += [ | ||
self.in_fifo.w_en.eq(0), | ||
data_reg.eq(0), | ||
data_chk.eq(0), | ||
data_bit_num.eq(self.frame_size-1), | ||
] | ||
with m.If(word_start): | ||
m.next = "HDR_TX" | ||
|
||
with m.State("HDR_TX"): | ||
m.d.sync += self.in_fifo.w_data.eq(Cat(frame_edge[0], Const(0xC0 >> 1, unsigned(7)))), | ||
with m.If(self.in_fifo.w_rdy): | ||
m.d.sync += self.in_fifo.w_en.eq(1), | ||
m.next = "DATA_SAMPLE" | ||
|
||
with m.State("DATA_SAMPLE"): | ||
m.d.sync += [ | ||
self.in_fifo.w_en.eq(0), | ||
data_reg.eq(Cat(self.pads.data_t.i, data_reg[:-1])), # MSB first | ||
] | ||
with m.If((data_bit_num % 8) == 0): | ||
# transmit data at every byte boundary | ||
m.next = "DATA_TX" | ||
with m.Else(): | ||
m.next = "DATA_WAIT" | ||
|
||
with m.State("DATA_WAIT"): | ||
with m.If(clk): | ||
m.d.sync += data_bit_num.eq(data_bit_num - 1) | ||
m.next = "DATA_SAMPLE" | ||
|
||
with m.State("DATA_TX"): | ||
with m.If(self.in_fifo.w_rdy): | ||
m.d.sync += [ | ||
data_chk.eq(data_chk ^ data_reg[:8]), | ||
self.in_fifo.w_data.eq(data_reg[:8]), | ||
self.in_fifo.w_en.eq(1), | ||
] | ||
m.next = "DATA_DONE" | ||
|
||
with m.State("DATA_DONE"): | ||
m.d.sync += self.in_fifo.w_en.eq(0) | ||
with m.If(data_bit_num != 0): | ||
m.next = "DATA_WAIT" | ||
with m.Else(): | ||
m.next = "CHK_TX" | ||
|
||
with m.State("CHK_TX"): | ||
with m.If(self.in_fifo.w_rdy): | ||
m.d.sync += [ | ||
self.in_fifo.w_data.eq(data_chk), | ||
self.in_fifo.w_en.eq(1), | ||
] | ||
m.next = "IDLE" | ||
|
||
return m | ||
|
||
|
||
class AudioI2SCaptureApplet(GlasgowApplet, name='audio-i2s-capture'): | ||
logger = logging.getLogger(__name__) | ||
preview = True | ||
help = "capture I2S audio to a WAV file" | ||
description = """ | ||
Capture stereo audio that is transmitted via I2S. | ||
""" | ||
required_revision = "C0" | ||
|
||
__pins = ("clk", "fs", "data") | ||
|
||
@classmethod | ||
def add_build_arguments(cls, parser, access): | ||
super().add_build_arguments(parser, access) | ||
|
||
for pin in cls.__pins: | ||
access.add_pin_argument(parser, pin, default=True) | ||
|
||
parser.add_argument( | ||
"--clk-edge", metavar="EDGE", type=str, choices=["r", "rising", "f", "falling"], | ||
default="rising", | ||
help="latch data at clock edge EDGE (default: %(default)s)") | ||
parser.add_argument( | ||
"--frame-size", metavar="SIZE", type=int, default=16, | ||
help="the size (in bits) of each sample value (default: %(default)s)") | ||
parser.add_argument( | ||
"--mode", metavar="MODE", type=str, choices=["i2s", "pcm"], | ||
default="i2s", | ||
help="I2S mode expects a bit shift of 1 between FS and the data. " | ||
"PCM mode expects a bit shift of 0 between FS and the data. " | ||
"(default: %(default)s)") | ||
|
||
# TODO: auto sample size | ||
|
||
def build(self, target, args): | ||
self.mux_interface = iface = target.multiplexer.claim_interface(self, args) | ||
target.submodules += AudioI2SCaptureSubtarget( | ||
pads=iface.get_pads(args, pins=self.__pins), | ||
in_fifo=iface.get_in_fifo(), | ||
clk_edge=args.clk_edge, | ||
mode=args.mode, | ||
frame_size=args.frame_size, | ||
) | ||
|
||
@classmethod | ||
def add_run_arguments(cls, parser, access): | ||
super().add_run_arguments(parser, access) | ||
|
||
async def run(self, device, args): | ||
iface = await device.demultiplexer.claim_interface(self, self.mux_interface, args) | ||
|
||
d_prev = 0 | ||
while True: | ||
x = await iface.read() | ||
#print(f'{x=} {len(x)=}') | ||
for i, d in enumerate(x): | ||
if d == 0x00: | ||
continue | ||
print(f' {i}: 0x{d:02X}') |