Skip to content

Commit

Permalink
Rework StreamBuffer and make it the core transaction primitive
Browse files Browse the repository at this point in the history
This reworks StreamBuffer, and makes it the core primitive
for transacting via read and write. It is now aware of the Device MTU,
and will align to the MTU. It also reworks the time stamps to align to
each transaction, supporting multiple time stamps per buffer for latency
analysis. We also allow the full vector type to be a Parameter,
to support other array types.
  • Loading branch information
sjkelly committed Sep 13, 2021
1 parent ea33e4c commit 5b73691
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 46 deletions.
35 changes: 35 additions & 0 deletions examples/rapid_read.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using SoapySDR, SoapyRTLSDR_jll

# Here we want to test the behavior in a loop, to ensure
# that we can block on buffer overflow conditions, and
# handle partial reads, measure latnecy, etc

function rapid_read()
dev = Devices()[1]
rx_chan = dev.rx
rx_stream = SoapySDR.Stream(rx_chan)
@show SoapySDR.mtu(rx_stream)
SoapySDR.activate!(rx_stream)
bufs = [SoapySDR.SampleBuffer(rx_stream, 10^6) for i = 1:2]
@show bufs[1].packet_count
@show bufs[2].packet_count
flip = true
while true
# double buffer
flip = !flip
current_buff = bufs[Int(flip)+1]
prev_buff = bufs[Int(!flip)+1]
@assert length(current_buff.bufs[1])%rx_stream.mtu == 0

read!(rx_stream, current_buff)

# sanity checks?
#nequal = 0
#for i in eachindex(current_buff.bufs)
# nequal += Int(current_buff.bufs[1][i] == prev_buff.bufs[1][i])
#end
#@show current_buff.timens
#@show nequal, current_buff.timens, delta_t
end

end
141 changes: 95 additions & 46 deletions src/highlevel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@ Base.cconvert(::Type{<:Ptr{SoapySDRStream}}, s::Stream) = s
Base.unsafe_convert(::Type{<:Ptr{SoapySDRStream}}, s::Stream) = s.ptr
SoapySDRDevice_closeStream(s::Stream) = SoapySDRDevice_closeStream(s.d, s)

streamtype(::Stream{T}) where T = T
mtu(s::Stream) = s.mtu

function Base.show(io::IO, s::Stream)
print(io, "Stream on ", s.d.hardware)
end
Expand Down Expand Up @@ -528,60 +531,92 @@ function Stream(channels::AbstractVector{T}; kwargs...) where {T <: Channel}
Stream(native_format, channels; kwargs...)
end

function _read!(s::Stream{T}, buffers::NTuple{N, Vector{T}}; all=true, timeout=nothing) where {N, T}
timeout === nothing && (timeout = 0.1u"s") # Default from SoapySDR upstream
buflen = length(first(buffers))
@assert all(buffer->length(buffer) == buflen, buffers)
@assert N == s.nchannels
nread, flags, timens = SoapySDRDevice_readStream(s.d, s, Ref(map(pointer, buffers)), buflen, uconvert(u"μs", timeout).val)
timens = timens * u"ns"
nread, flags, timens
end
"""
SampleBuffer(s::SoapySDR.Stream)
SampleBuffer(s::SoapySDR.Stream, n::Int)
function Base.read!(s, buffers; kwargs...)
_read!(s, buffers; kwargs...)[1]
end
Constructs a sample buffer for a given stream. Can contain multiple channels and be of arbitrary length.
To avoid undefined behavior, this requested length with be aligned to the device MTU. It is therefore
important to ensure that subsequent calls and calculations use this length.
"""
SampleBuffer{N, T}
Returns a `SampleBuffer{N,T}` with fields:
bufs::NTuple{N, T}
packet_count::Int
timens::Vector{Pair{Int,typeof(1u"ns")}}
Represents a buffer of samples, of `N` channels producing data of type `T`.
where N is the number of channels and T is the vector type of the buffer (default: Vector).
Fields:
bufs::NTuple{N, Vector{T}}
flags::Cint
timens::typeof(1u"ns")
`bufs` are the buffers for each channel.
`length` length of the buffer.
`packet_count` are the number of transactions of MTU size required by subsequent `read` and `write` operations.
`timens` are the offset and time stamp pairs for each packet.
"""
struct SampleBuffer{N, T}
bufs::NTuple{N, Vector{T}}
flags::Cint
timens::typeof(1u"ns")
bufs::NTuple{N, T}
length::Int
packet_count::Int
timens::Vector{Pair{Int, typeof(1u"ns")}}
end
Base.length(sb::SampleBuffer) = length(first(sb.bufs))

SampleBuffer(s::Stream) = SampleBuffer(s, s.mtu)

function SampleBuffer(s::Stream{T}, length; round::RoundingMode{RM}=RoundDown, vectortype=Vector) where {T, RM}

# align to MTU
overrun = length%s.mtu
realigned = false
if length < s.mtu
length = s.mtu
realigned = true
elseif length > s.mtu && overrun != 0
length = if RM == :Down
length - overrun
elseif RM == :Up
length + s.mtu - overrun
end
realigned = true
end
if realigned
@info "requested 'length' is not aligned to MTU! Aligning to length of $(length) samples"
@info "get MTU with SoapySDR.mtu(::Stream)."
end

packet_count = length/s.mtu
bufs = ntuple(_->vectortype{T}(undef, length), s.nchannels)
SampleBuffer(bufs, length, Int(packet_count), Vector{Pair{Int, typeof(1u"ns")}}(undef, length))
end
Base.length(sb::SampleBuffer) = length(sb.bufs[1])

"""
read(s::SoapySDR.Stream, nb::Integer; all=true)

Read at most nb bytes from s, returning a `SampleBuffer`
"""
read!(s::SoapySDR.Stream, buf::SampleBuffer; timeout::Int)
If all is true (the default), this function will block repeatedly trying to read all requested bytes, until an error or
end-of-file occurs. If all is false, at most one read call is performed, and the amount of data returned is device-dependent.
Note that not all stream types support the all option.
Read data from the device into the given buffer.
"""
function Base.read(s::Stream{T}, n::Int; all=true, kwargs...) where {T}
bufs = ntuple(_->Vector{T}(undef, n), s.nchannels)
nread, flags, timens = _read!(s, bufs; kwargs...)
# By definition of read, we can allow fewer samples than requested, unless all=false
if nread != n && all
@info "could not read requested length, suggest using read(...;all=false)"
@info("assertion debugging", nread, n)
@assert nread == n
function Base.read!(s::Stream{T}, samplebuffer::SampleBuffer{N, VT}; timeout=nothing) where {N, T, VT <: AbstractVector{T}}
timeout === nothing && (timeout = 0.1u"s") # Default from SoapySDR upstream

# check length did not change
for i in eachindex(samplebuffer.bufs)
@assert length(samplebuffer.bufs[i]) == samplebuffer.length
end
SampleBuffer(bufs, flags, timens)
end

function Base.read(s::Stream; all=true, kwargs...)
read(s, s.mtu; all=true, kwargs...)
for packet in 1:samplebuffer.packet_count
offset = (packet-1)*s.mtu
@show offset
nread, flags, timens = SoapySDRDevice_readStream(s.d, s, Ref(map(b -> pointer(b, offset), samplebuffer.bufs)), s.mtu, uconvert(u"μs", timeout).val)
timens = timens * u"ns"

@assert flags & SOAPY_SDR_MORE_FRAGMENTS == 0

if nread != s.mtu
@info("assertion debugging", nread, n)
@assert nread == n
end

samplebuffer.timens[packet] = (offset => timens)
end
samplebuffer
end

function activate!(s::Stream; flags = 0, timens = nothing, numElems=0)
Expand All @@ -594,12 +629,26 @@ function deactivate!(s::Stream; flags = 0, timens = nothing)
nothing
end

function Base.write(s::Stream{T}, buffers::NTuple{N, Vector{T}}; timeout = nothing) where {N, T}
function Base.write(s::Stream{T}, samplebuffer::SampleBuffer{N, VT}; timeout = nothing) where {N, T, VT <: AbstractVector{T}}
timeout === nothing && (timeout = 0.1u"s") # Default from SoapySDR upstream
buflen = length(first(buffers))
@assert all(buffer->length(buffer) == buflen, buffers)
@assert N == s.nchannels
SoapySDRDevice_writeStream(s.d, s, Ref(map(pointer, buffers)), buflen, 0, 0, uconvert(u"μs", timeout).val)

# check length did not change
for i in eachindex(samplebuffer.bufs)
@assert length(samplebuffer.bufs[i]) == samplebuffer.length
end

for packet in 1:samplebuffer.packet_count
offset = (packet-1)*s.mtu

nelem, flags = SoapySDRDevice_writeStream(s.d, s, Ref(map(b -> pointer(b, offset), samplebuffer.bufs)), s.mtu, 0, 0, uconvert(u"μs", timeout).val)

if nelem != s.mtu
@info("assertion debugging", nelem, n)
@assert nelem == n
end

end
samplebuffer
end


Expand Down

0 comments on commit 5b73691

Please sign in to comment.