Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large buffers #22

Merged
merged 4 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 39 additions & 35 deletions docs/src/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,40 +39,44 @@ julia> using SoapySDR, SoapyRTLSDR_jll

### Transmitting and Receiving

TX:
```
# Open first TX-capable channel on first device
channel = Devices()[1].tx[1]

# Configure channel with appropriate parameters
channel.bandwidth = 800u"kHz"
channel.frequency = 30u"MHz"
channel.gain = 42u"dB"
channel.sample_rate = 2.1u"MHz"

# Open a (potentially multichannel) stream on this channel
stream = SoapySDR.Stream([channel])
SoapySDR.activate!(stream)

# Write out random noise
Base.write(stream, (randn(ComplexF32, 10000),))
```

RX:
```
# Open first RX-capable channel on first device
channel = Devices()[1].rx[1]

# Configure channel with appropriate parameters
channel.bandwidth = 800u"kHz"
channel.frequency = 30u"MHz"
channel.gain = 42u"dB"
channel.sample_rate = 2.1u"MHz"

# Open a (potentially multichannel) stream on this channel
stream = SoapySDR.Stream([channel])
SoapySDR.activate!(stream)

# Collect all available samples in the buffer
Base.read(stream)
# Open all TX-capable channels on first device
tx_channels = Devices()[1].tx

# Open all RX-capable channels on first device
rx_channels = Devices()[1].rx

# Configure a TX channel with appropriate parameters
# configure the RX channel with similar for e.g. a loopback test
# Be sure to check your local regulations before transmitting!
tx_channel[1].bandwidth = 800u"kHz"
tx_channel[1].frequency = 30u"MHz"
tx_channel[1].gain = 42u"dB"
tx_channel[1].sample_rate = 2.1u"MHz"

# Open a (potentially multichannel) stream on the channels
tx_stream = SoapySDR.Stream(tx_channels)
rx_stream = SoapySDR.Stream(rx_channels)

# Setup a sample buffer optimized for the device
# The data can be access with e.g. tx_buf.bufs
# Note: we ask for 10,000 samples, but the API will re-size correctly for the device
tx_buf = SoapySDR.SampleBuffer(tx_stream, 10_000)
rx_buf = SoapySDR.SampleBuffer(rx_stream, 10_000)

# Setup some data to transmit on each channel
for i in eachindex(tx_buf)
tx_buf[i] = randn(SoapySDR.streamtype(tx_stream), length(tx_buf))
end

# Spawn two tasks for full duplex operation
# The tasks will run in parallel and for best resuslts run julia with --threads=auto
read_task = Threads.@spawn read!(rx_stream, rx_buf)
write_task = Threads.@spawn write(tx_stream, tx_buf)

# Wait for the tasks to complete
wait(read_task)
wait(write_task)

@show rx_buf[1][1:100] # show the first 100 samples of the first buffer
```
35 changes: 35 additions & 0 deletions examples/rapid_read.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using SoapySDR, SoapyRTLSDR_jll

# Here we want to test the behavior in a loop, to ensure
# that we can block on buffer overflow conditions, and
# handle partial reads, measure latnecy, etc

function rapid_read()
dev = Devices()[1]
rx_chan = dev.rx
rx_stream = SoapySDR.Stream(rx_chan)
@show SoapySDR.mtu(rx_stream)
SoapySDR.activate!(rx_stream)
bufs = [SoapySDR.SampleBuffer(rx_stream, 10^6) for i = 1:2]
@show bufs[1].packet_count
@show bufs[2].packet_count
flip = true
while true
# double buffer
flip = !flip
current_buff = bufs[Int(flip)+1]
prev_buff = bufs[Int(!flip)+1]
@assert length(current_buff.bufs[1])%rx_stream.mtu == 0

read!(rx_stream, current_buff)

# sanity checks?
#nequal = 0
#for i in eachindex(current_buff.bufs)
# nequal += Int(current_buff.bufs[1][i] == prev_buff.bufs[1][i])
#end
#@show current_buff.timens
#@show nequal, current_buff.timens, delta_t
end

end
145 changes: 106 additions & 39 deletions src/highlevel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@ Base.cconvert(::Type{<:Ptr{SoapySDRStream}}, s::Stream) = s
Base.unsafe_convert(::Type{<:Ptr{SoapySDRStream}}, s::Stream) = s.ptr
SoapySDRDevice_closeStream(s::Stream) = SoapySDRDevice_closeStream(s.d, s)

streamtype(::Stream{T}) where T = T
mtu(s::Stream) = s.mtu

function Base.show(io::IO, s::Stream)
print(io, "Stream on ", s.d.hardware)
end
Expand Down Expand Up @@ -528,50 +531,97 @@ function Stream(channels::AbstractVector{T}; kwargs...) where {T <: Channel}
Stream(native_format, channels; kwargs...)
end

function _read!(s::Stream{T}, buffers::NTuple{N, Vector{T}}; timeout=nothing) where {N, T}
timeout === nothing && (timeout = 0.1u"s") # Default from SoapySDR upstream
buflen = length(first(buffers))
@assert all(buffer->length(buffer) == buflen, buffers)
@assert N == s.nchannels
nread, flags, timens = SoapySDRDevice_readStream(s.d, s, Ref(map(pointer, buffers)), buflen, uconvert(u"μs", timeout).val)
timens = timens * u"ns"
nread, flags, timens
end
"""
SampleBuffer(s::SoapySDR.Stream)
SampleBuffer(s::SoapySDR.Stream, n::Int)

function Base.read!(s, buffers; kwargs...)
_read!(s, buffers; kwargs...)[1]
end
Constructs a sample buffer for a given stream. Can contain multiple channels and be of arbitrary length.
To avoid undefined behavior, this requested length with be aligned to the device MTU. It is therefore
important to ensure that subsequent calls and calculations use this length.

Returns a `SampleBuffer{N,T}` with fields:
bufs::NTuple{N, T}
packet_count::Int
timens::Vector{Pair{Int,typeof(1u"ns")}}

where N is the number of channels and T is the vector type of the buffer (default: Vector).

`bufs` are the buffers for each channel.
`length` length of the buffer.
`packet_count` are the number of transactions of MTU size required by subsequent `read` and `write` operations.
`timens` are the offset and time stamp pairs for each packet.
"""
struct SampleBuffer{N, T}
bufs::NTuple{N, Vector{T}}
flags::Cint
timens::typeof(1u"ns")
bufs::NTuple{N, T}
length::Int
packet_count::Int
timens::Vector{Pair{Int, typeof(1u"ns")}}
end
Base.length(sb::SampleBuffer) = length(first(sb.bufs))
Base.getindex(sb::SampleBuffer, i::Int) = sb.bufs[i]
Base.setindex(sb::SampleBuffer, i::Int, v) = (sb.bufs[i] = v)
Base.eachindex(::SampleBuffer{N,T}) where {N, T} = 1:N
SampleBuffer(s::Stream) = SampleBuffer(s, s.mtu)

function SampleBuffer(s::Stream{T}, length; round::RoundingMode{RM}=RoundDown, vectortype=Vector) where {T, RM}

# align to MTU
overrun = length%s.mtu
realigned = false
if length < s.mtu
length = s.mtu
realigned = true
elseif length > s.mtu && overrun != 0
length = if RM == :Down
length - overrun
elseif RM == :Up
length + s.mtu - overrun
end
realigned = true
end
if realigned
@info "requested 'length' is not aligned to MTU! Aligning to length of $(length) samples"
@info "get MTU with SoapySDR.mtu(::Stream)."
end

packet_count = Int(length/s.mtu)
bufs = ntuple(_->vectortype{T}(undef, length), s.nchannels)
SampleBuffer(bufs, length, packet_count, Vector{Pair{Int, typeof(1u"ns")}}(undef, packet_count))
end
Base.length(sb::SampleBuffer) = length(sb.bufs[1])

"""
read(s::SoapySDR.Stream, nb::Integer; all=true)

Read at most nb bytes from s, returning a `SampleBuffer`
"""
read!(s::SoapySDR.Stream, buf::SampleBuffer; timeout::Int)

If all is true (the default), this function will block repeatedly trying to read all requested bytes, until an error or
end-of-file occurs. If all is false, at most one read call is performed, and the amount of data returned is device-dependent.
Note that not all stream types support the all option.
Read data from the device into the given buffer.
"""
function Base.read(s::Stream{T}, n::Int; all=true, kwargs...) where {T}
bufs = ntuple(_->Vector{T}(undef, n), s.nchannels)
nread, flags, timens = _read!(s, bufs; kwargs...)
# By definition of read, we can allow fewer samples than requested, unless all=false
if nread != n && all
@info "could not read requested length, suggest using read(...;all=false)"
@info("assertion debugging", nread, n)
@assert nread == n
function Base.read!(s::Stream{T}, samplebuffer::SampleBuffer{N, VT}; timeout=nothing, activate=true, deactivate=true) where {N, T, VT <: AbstractVector{T}}
timeout === nothing && (timeout = 0.1u"s") # Default from SoapySDR upstream

# check length did not change
for i in eachindex(samplebuffer.bufs)
@assert length(samplebuffer.bufs[i]) == samplebuffer.length
end
SampleBuffer(bufs, flags, timens)
end

function Base.read(s::Stream; all=true, kwargs...)
read(s, s.mtu; all=true, kwargs...)
activate && activate!(s)
for packet in 1:samplebuffer.packet_count
offset = (packet-1)*s.mtu
@show offset
nread, flags, timens = SoapySDRDevice_readStream(s.d, s, Ref(map(b -> pointer(b, offset), samplebuffer.bufs)), s.mtu, uconvert(u"μs", timeout).val)
timens = timens * u"ns"

@assert flags & SOAPY_SDR_MORE_FRAGMENTS == 0

if nread != s.mtu
@info("assertion debugging", nread, n)
@assert nread == n
end

samplebuffer.timens[packet] = (offset => timens)
end
deactivate && deactivate!(s)

samplebuffer
end

function activate!(s::Stream; flags = 0, timens = nothing, numElems=0)
Expand All @@ -584,12 +634,29 @@ function deactivate!(s::Stream; flags = 0, timens = nothing)
nothing
end

function Base.write(s::Stream{T}, buffers::NTuple{N, Vector{T}}; timeout = nothing) where {N, T}
function Base.write(s::Stream{T}, samplebuffer::SampleBuffer{N, VT}; timeout = nothing, activate=true, deactivate=true) where {N, T, VT <: AbstractVector{T}}
timeout === nothing && (timeout = 0.1u"s") # Default from SoapySDR upstream
buflen = length(first(buffers))
@assert all(buffer->length(buffer) == buflen, buffers)
@assert N == s.nchannels
SoapySDRDevice_writeStream(s.d, s, Ref(map(pointer, buffers)), buflen, 0, 0, uconvert(u"μs", timeout).val)

# check length did not change
for i in eachindex(samplebuffer.bufs)
@assert length(samplebuffer.bufs[i]) == samplebuffer.length
end

activate && activate!(s)
for packet in 1:samplebuffer.packet_count
offset = (packet-1)*s.mtu

nelem, flags = SoapySDRDevice_writeStream(s.d, s, Ref(map(b -> pointer(b, offset), samplebuffer.bufs)), s.mtu, 0, 0, uconvert(u"μs", timeout).val)

if nelem != s.mtu
@info("assertion debugging", nelem, n)
@assert nelem == n
end

end
deactivate && deactivate!(s)

samplebuffer
end


Expand Down